cluster

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2024 License: Apache-2.0 Imports: 33 Imported by: 0

README

Proto.Actor Cluster - Virtual Actors (Alpha)

Massively distributed actors for GO

Proto.Actor supports the classic actor model also found in Erlang and Akka.
Our cluster support however uses a different approach, Virtual Actor Model.

This is a model where each actor appears to always exist. There is no lifecycle as in the classic actor model. You get a reference to the actor by asking for it's ID.

e.g.

hello := shared.GetHelloGrain("abc")
res := hello.SayHello(&shared.HelloRequest{Name: "Proto.Actor"})

This will ask the cluster where the 'abc' actor is located. If it does not yet exist, it will be created for you.

See Microsoft Orleans for more info about the Virtual Actor Model: http://dotnet.github.io/orleans/

How to

Protobuf IDL Definition

Start by defining your messages and grain contracts. You do this by using Protobuf IDL files.

Here is the definition from the /examples/cluster/shared example

syntax = "proto3";
package shared;

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

message AddRequest {
  double a = 1;
  double b = 2;
}

message AddResponse {
  double result = 1;
}

service Hello {
  rpc SayHello (HelloRequest) returns (HelloResponse) {} 
  rpc Add(AddRequest) returns (AddResponse) {}
}

Once you have this, you can generate your code using the protobuf protoc compiler.

Windows

#generate messages
protoc -I=. -I=%GOPATH%\src --gogoslick_out=. protos.proto
#generate grains 
protoc -I=. -I=%GOPATH%\src --gorleans_out=. protos.proto 

Implementing

Once the messages and contracts have been generated, you can start implementing your own business logic. This is essentially a type which is powered by a Proto.Actor actor behind the scenes.

package shared

// a Go struct implementing the Hello interface
type hello struct {
}

func (*hello) SayHello(r *HelloRequest) *HelloResponse {
	return &HelloResponse{Message: "hello " + r.Name}
}

func (*hello) Add(r *AddRequest) *AddResponse {
	return &AddResponse{Result: r.A + r.B}
}

// Register what implementation Proto.Actor should use when 
// creating actors for a certain grain type.
func init() {
	// apply DI and setup logic
	HelloFactory(func() Hello { return &hello{} })
}

Seed nodes

func main() {
    cluster.Start("127.0.0.1:7711")
    console.ReadLine()
}

Member nodes

func main() {
	cluster.Start("127.0.0.1:0", "127.0.0.1:7711")

    // get a reference to the virtual actor called "abc" of type Hello
	hello := shared.GetHelloGrain("abc")
	res := hello.SayHello(&shared.HelloRequest{Name: "Proto.Actor"})
	log.Printf("Message from grain %v", res.Message)
}

FAQ

Can I use Proto.Actor Cluster in production?

The Proto.Actor Cluster support is in alpha version, thus not production ready.

What about performance?

Proto.Actor Remoting is able to pass 1 million+ messages per second on a standard dev machine. This is the same infrastructure used in Proto.Actor cluster. Proto.Actor Cluster however uses an RPC API, meaning it is Request/Response in nature. If you wait for a response for each call, the throughput will ofcourse be a lot less. Async Fire and forget for performance, Request/Response for simplicity.

Documentation

Index

Constants

View Source
const (
	ErrorReason_OK                  = "OK"
	ErrorReason_CANCELLED           = "CANCELLED"
	ErrorReason_UNKNOWN             = "UNKNOWN"
	ErrorReason_INVALID_ARGUMENT    = "INVALID_ARGUMENT"
	ErrorReason_DEADLINE_EXCEEDED   = "DEADLINE_EXCEEDED"
	ErrorReason_NOT_FOUND           = "NOT_FOUND"
	ErrorReason_ALREADY_EXISTS      = "ALREADY_EXISTS"
	ErrorReason_PERMISSION_DENIED   = "PERMISSION_DENIED"
	ErrorReason_RESOURCE_EXHAUSTED  = "RESOURCE_EXHAUSTED"
	ErrorReason_FAILED_PRECONDITION = "FAILED_PRECONDITION"
	ErrorReason_ABORTED             = "ABORTED"
	ErrorReason_OUT_OF_RANGE        = "OUT_OF_RANGE"
	ErrorReason_UNIMPLEMENTED       = "UNIMPLEMENTED"
	ErrorReason_INTERNAL            = "INTERNAL"
	ErrorReason_UNAVAILABLE         = "UNAVAILABLE"
	ErrorReason_DATA_LOSS           = "DATA_LOSS"
	ErrorReason_UNAUTHENTICATED     = "UNAUTHENTICATED"
)
View Source
const (
	TopologyKey       string = "topology"
	HearthbeatKey     string = "heathbeat"
	GracefullyLeftKey string = "left"
)
View Source
const DefaultGossipActorName string = "gossip"
View Source
const PubSubDeliveryName = "$pubsub-delivery"
View Source
const TopicActorKind = "prototopic"

Variables

View Source
var (
	IdentityHandoverAck_State_name = map[int32]string{
		0: "processed",
		1: "incorrect_topology",
	}
	IdentityHandoverAck_State_value = map[string]int32{
		"processed":          0,
		"incorrect_topology": 1,
	}
)

Enum value maps for IdentityHandoverAck_State.

View Source
var (
	DeliveryStatus_name = map[int32]string{
		0:   "Delivered",
		1:   "SubscriberNoLongerReachable",
		2:   "Timeout",
		127: "OtherError",
	}
	DeliveryStatus_value = map[string]int32{
		"Delivered":                   0,
		"SubscriberNoLongerReachable": 1,
		"Timeout":                     2,
		"OtherError":                  127,
	}
)

Enum value maps for DeliveryStatus.

View Source
var (
	PublishStatus_name = map[int32]string{
		0: "Ok",
		1: "Failed",
	}
	PublishStatus_value = map[string]int32{
		"Ok":     0,
		"Failed": 1,
	}
)

Enum value maps for PublishStatus.

View Source
var FailBatchAndContinue = NewPublishingErrorDecision(0)

FailBatchAndContinue skips the current batch and proceeds to the next one. The delivery reports (tasks) related to that batch are still failed with the exception that triggered the error handling.

View Source
var FailBatchAndStop = NewPublishingErrorDecision(0)

FailBatchAndStop causes the BatchingProducer to stop and fail the pending messages

View Source
var File_cluster_proto protoreflect.FileDescriptor
View Source
var File_gossip_proto protoreflect.FileDescriptor
View Source
var File_grain_proto protoreflect.FileDescriptor
View Source
var File_pubsub_proto protoreflect.FileDescriptor
View Source
var File_pubsub_test_proto protoreflect.FileDescriptor
View Source
var RetryBatchImmediately = NewPublishingErrorDecision(0)

RetryBatchImmediately retries the current batch immediately

Functions

func MembersToMap

func MembersToMap(members Members) map[string]*Member

func NewGossipConsensusHandler

func NewGossipConsensusHandler() *gossipConsensusHandler

func Reason

func Reason(err error) string

func RemotePlacementActor

func RemotePlacementActor(address string) *actor.PID

RemotePlacementActor returns the PID of the remote placement actor

func SetClusterIdentity

func SetClusterIdentity(ctx actor.ExtensionContext, ci *ClusterIdentity)

func SortMembers

func SortMembers(members Members)

func TopologyHash

func TopologyHash(members Members) uint64

func WithClusterIdentity

func WithClusterIdentity(props *actor.Props, ci *ClusterIdentity) *actor.Props

Types

type Acknowledge

type Acknowledge struct {
	// contains filtered or unexported fields
}

func (*Acknowledge) Descriptor deprecated

func (*Acknowledge) Descriptor() ([]byte, []int)

Deprecated: Use Acknowledge.ProtoReflect.Descriptor instead.

func (*Acknowledge) ProtoMessage

func (*Acknowledge) ProtoMessage()

func (*Acknowledge) ProtoReflect

func (x *Acknowledge) ProtoReflect() protoreflect.Message

func (*Acknowledge) Reset

func (x *Acknowledge) Reset()

func (*Acknowledge) String

func (x *Acknowledge) String() string

type ActivatedKind

type ActivatedKind struct {
	Kind     string
	Props    *actor.Props
	Strategy MemberStrategy
	// contains filtered or unexported fields
}

func (*ActivatedKind) Dev

func (ak *ActivatedKind) Dev()

func (*ActivatedKind) Inc

func (ak *ActivatedKind) Inc()

type Activation

type Activation struct {
	Pid             *actor.PID       `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	// contains filtered or unexported fields
}

func (*Activation) Descriptor deprecated

func (*Activation) Descriptor() ([]byte, []int)

Deprecated: Use Activation.ProtoReflect.Descriptor instead.

func (*Activation) GetClusterIdentity

func (x *Activation) GetClusterIdentity() *ClusterIdentity

func (*Activation) GetPid

func (x *Activation) GetPid() *actor.PID

func (*Activation) ProtoMessage

func (*Activation) ProtoMessage()

func (*Activation) ProtoReflect

func (x *Activation) ProtoReflect() protoreflect.Message

func (*Activation) Reset

func (x *Activation) Reset()

func (*Activation) String

func (x *Activation) String() string

type ActivationRequest

type ActivationRequest struct {
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	RequestId       string           `protobuf:"bytes,2,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
	TopologyHash    uint64           `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*ActivationRequest) Descriptor deprecated

func (*ActivationRequest) Descriptor() ([]byte, []int)

Deprecated: Use ActivationRequest.ProtoReflect.Descriptor instead.

func (*ActivationRequest) GetClusterIdentity

func (x *ActivationRequest) GetClusterIdentity() *ClusterIdentity

func (*ActivationRequest) GetRequestId

func (x *ActivationRequest) GetRequestId() string

func (*ActivationRequest) GetTopologyHash

func (x *ActivationRequest) GetTopologyHash() uint64

func (*ActivationRequest) ProtoMessage

func (*ActivationRequest) ProtoMessage()

func (*ActivationRequest) ProtoReflect

func (x *ActivationRequest) ProtoReflect() protoreflect.Message

func (*ActivationRequest) Reset

func (x *ActivationRequest) Reset()

func (*ActivationRequest) String

func (x *ActivationRequest) String() string

type ActivationResponse

type ActivationResponse struct {
	Pid          *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	Failed       bool       `protobuf:"varint,2,opt,name=failed,proto3" json:"failed,omitempty"`
	TopologyHash uint64     `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*ActivationResponse) Descriptor deprecated

func (*ActivationResponse) Descriptor() ([]byte, []int)

Deprecated: Use ActivationResponse.ProtoReflect.Descriptor instead.

func (*ActivationResponse) GetFailed

func (x *ActivationResponse) GetFailed() bool

func (*ActivationResponse) GetPid

func (x *ActivationResponse) GetPid() *actor.PID

func (*ActivationResponse) GetTopologyHash

func (x *ActivationResponse) GetTopologyHash() uint64

func (*ActivationResponse) ProtoMessage

func (*ActivationResponse) ProtoMessage()

func (*ActivationResponse) ProtoReflect

func (x *ActivationResponse) ProtoReflect() protoreflect.Message

func (*ActivationResponse) Reset

func (x *ActivationResponse) Reset()

func (*ActivationResponse) String

func (x *ActivationResponse) String() string

type ActivationTerminated

type ActivationTerminated struct {
	Pid             *actor.PID       `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	// contains filtered or unexported fields
}

Terminated, removed from lookup

func (*ActivationTerminated) Descriptor deprecated

func (*ActivationTerminated) Descriptor() ([]byte, []int)

Deprecated: Use ActivationTerminated.ProtoReflect.Descriptor instead.

func (*ActivationTerminated) GetClusterIdentity

func (x *ActivationTerminated) GetClusterIdentity() *ClusterIdentity

func (*ActivationTerminated) GetPid

func (x *ActivationTerminated) GetPid() *actor.PID

func (*ActivationTerminated) ProtoMessage

func (*ActivationTerminated) ProtoMessage()

func (*ActivationTerminated) ProtoReflect

func (x *ActivationTerminated) ProtoReflect() protoreflect.Message

func (*ActivationTerminated) Reset

func (x *ActivationTerminated) Reset()

func (*ActivationTerminated) String

func (x *ActivationTerminated) String() string

type ActivationTerminating

type ActivationTerminating struct {
	Pid             *actor.PID       `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	// contains filtered or unexported fields
}

Started terminating, not yet removed from IIdentityLookup

func (*ActivationTerminating) Descriptor deprecated

func (*ActivationTerminating) Descriptor() ([]byte, []int)

Deprecated: Use ActivationTerminating.ProtoReflect.Descriptor instead.

func (*ActivationTerminating) GetClusterIdentity

func (x *ActivationTerminating) GetClusterIdentity() *ClusterIdentity

func (*ActivationTerminating) GetPid

func (x *ActivationTerminating) GetPid() *actor.PID

func (*ActivationTerminating) ProtoMessage

func (*ActivationTerminating) ProtoMessage()

func (*ActivationTerminating) ProtoReflect

func (x *ActivationTerminating) ProtoReflect() protoreflect.Message

func (*ActivationTerminating) Reset

func (x *ActivationTerminating) Reset()

func (*ActivationTerminating) String

func (x *ActivationTerminating) String() string

type ActorStatistics

type ActorStatistics struct {
	ActorCount map[string]int64 `` /* 180-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*ActorStatistics) Descriptor deprecated

func (*ActorStatistics) Descriptor() ([]byte, []int)

Deprecated: Use ActorStatistics.ProtoReflect.Descriptor instead.

func (*ActorStatistics) GetActorCount

func (x *ActorStatistics) GetActorCount() map[string]int64

func (*ActorStatistics) ProtoMessage

func (*ActorStatistics) ProtoMessage()

func (*ActorStatistics) ProtoReflect

func (x *ActorStatistics) ProtoReflect() protoreflect.Message

func (*ActorStatistics) Reset

func (x *ActorStatistics) Reset()

func (*ActorStatistics) String

func (x *ActorStatistics) String() string

type AddConsensusCheck

type AddConsensusCheck struct {
	ID    string
	Check *ConsensusCheck
}

func NewAddConsensusCheck

func NewAddConsensusCheck(id string, check *ConsensusCheck) AddConsensusCheck

type BatchingProducer

type BatchingProducer struct {
	// contains filtered or unexported fields
}

func NewBatchingProducer

func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchingProducerConfigOption) *BatchingProducer

func (*BatchingProducer) Dispose

func (p *BatchingProducer) Dispose()

Dispose stops the producer and releases all resources.

func (*BatchingProducer) Produce

func (p *BatchingProducer) Produce(ctx context.Context, message proto.Message) (*ProduceProcessInfo, error)

Produce a message to producer queue. The return info can be used to wait for the message to be published.

type BatchingProducerConfig

type BatchingProducerConfig struct {
	// Maximum size of the published batch. Default: 2000.
	BatchSize int
	// Max size of the requests waiting in queue. If value is provided, the producer will throw
	// ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded
	// Note that bounded queue has better performance than unbounded queue.
	// Default: 0 (unbounded)
	MaxQueueSize int

	// How long to wait for the publishing to complete.
	// Default: 5s
	PublishTimeout time.Duration

	// Error handler that can decide what to do with an error when publishing a batch.
	// Default: Fail and stop the BatchingProducer
	OnPublishingError PublishingErrorHandler

	// A throttle for logging from this producer. By default, a throttle shared between all instances of
	// BatchingProducer is used, that allows for 10 events in 1 second.
	LogThrottle actor.ShouldThrottle

	// Optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean
	// up code to recover resources.
	PublisherIdleTimeout time.Duration
}

type BatchingProducerConfigOption

type BatchingProducerConfigOption func(config *BatchingProducerConfig)

func WithBatchingProducerBatchSize

func WithBatchingProducerBatchSize(batchSize int) BatchingProducerConfigOption

WithBatchingProducerBatchSize sets maximum size of the published batch. Default: 2000.

func WithBatchingProducerLogThrottle

func WithBatchingProducerLogThrottle(logThrottle actor.ShouldThrottle) BatchingProducerConfigOption

WithBatchingProducerLogThrottle sets a throttle for logging from this producer. By default, a throttle shared between all instances of BatchingProducer is used, that allows for 10 events in 10 seconds.

func WithBatchingProducerMaxQueueSize

func WithBatchingProducerMaxQueueSize(maxQueueSize int) BatchingProducerConfigOption

WithBatchingProducerMaxQueueSize set max size of the requests waiting in queue. If value is provided, the producer will throw ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded Note that bounded queue has better performance than unbounded queue. Default: 0 (unbounded)

func WithBatchingProducerOnPublishingError

func WithBatchingProducerOnPublishingError(onPublishingError PublishingErrorHandler) BatchingProducerConfigOption

WithBatchingProducerOnPublishingError sets error handler that can decide what to do with an error when publishing a batch. Default: Fail and stop the BatchingProducer

func WithBatchingProducerPublishTimeout

func WithBatchingProducerPublishTimeout(publishTimeout time.Duration) BatchingProducerConfigOption

WithBatchingProducerPublishTimeout sets how long to wait for the publishing to complete. Default: 5s

func WithBatchingProducerPublisherIdleTimeout

func WithBatchingProducerPublisherIdleTimeout(publisherIdleTimeout time.Duration) BatchingProducerConfigOption

WithBatchingProducerPublisherIdleTimeout sets an optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean up code to recover resources.

type Cluster

type Cluster struct {
	ActorSystem    *actor.ActorSystem
	Config         *Config
	Gossip         *Gossiper
	PubSub         *PubSub
	Remote         *remote.Remote
	PidCache       *PidCacheValue
	MemberList     *MemberList
	IdentityLookup IdentityLookup
	// contains filtered or unexported fields
}

func GetCluster

func GetCluster(actorSystem *actor.ActorSystem) *Cluster

func New

func New(actorSystem *actor.ActorSystem, config *Config) *Cluster

func (*Cluster) BatchingProducer

func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigOption) *BatchingProducer

BatchingProducer create a new PubSub batching producer for specified topic, that publishes directly to the topic actor

func (*Cluster) ExtensionID

func (c *Cluster) ExtensionID() extensions.ExtensionID

func (*Cluster) Get

func (c *Cluster) Get(identity string, kind string) *actor.PID

func (*Cluster) GetBlockedMembers

func (c *Cluster) GetBlockedMembers() set.Set[string]

func (*Cluster) GetClusterKind

func (c *Cluster) GetClusterKind(kind string) *ActivatedKind

func (*Cluster) GetClusterKinds

func (c *Cluster) GetClusterKinds() []string

func (*Cluster) Logger

func (c *Cluster) Logger() *slog.Logger

func (*Cluster) Publisher

func (c *Cluster) Publisher() Publisher

Publisher creates a new PubSub publisher that publishes messages directly to the TopicActor

func (*Cluster) Request

func (c *Cluster) Request(identity string, kind string, message interface{}, option ...GrainCallOption) (interface{}, error)

func (*Cluster) RequestFuture

func (c *Cluster) RequestFuture(identity string, kind string, message interface{}, option ...GrainCallOption) (*actor.Future, error)

func (*Cluster) Shutdown

func (c *Cluster) Shutdown(graceful bool)

func (*Cluster) StartClient

func (c *Cluster) StartClient()

func (*Cluster) StartMember

func (c *Cluster) StartMember()

func (*Cluster) SubscribeByClusterIdentity

func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error)

SubscribeByClusterIdentity subscribes to a PubSub topic by cluster identity

func (*Cluster) SubscribeByPid

func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error)

SubscribeByPid subscribes to a PubSub topic by subscriber PID

func (*Cluster) SubscribeWithReceive

func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, opts ...GrainCallOption) (*SubscribeResponse, error)

SubscribeWithReceive subscribe to a PubSub topic by providing a Receive function, that will be used to spawn a subscriber actor

func (*Cluster) TryGetClusterKind

func (c *Cluster) TryGetClusterKind(kind string) (*ActivatedKind, bool)

func (*Cluster) UnsubscribeByClusterIdentity

func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error)

UnsubscribeByClusterIdentity unsubscribes from a PubSub topic by cluster identity

func (*Cluster) UnsubscribeByIdentityAndKind

func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error)

UnsubscribeByIdentityAndKind unsubscribes from a PubSub topic by cluster identity

func (*Cluster) UnsubscribeByPid

func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error)

UnsubscribeByPid unsubscribes from a PubSub topic by subscriber PID

type ClusterContextConfig

type ClusterContextConfig struct {
	RequestsLogThrottlePeriod                    time.Duration
	MaxNumberOfEventsInRequestLogThrottledPeriod int
	// contains filtered or unexported fields
}

ClusterContextConfig is used to configure cluster context parameters

type ClusterIdentity

type ClusterIdentity struct {
	Identity string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"`
	Kind     string `protobuf:"bytes,2,opt,name=kind,proto3" json:"kind,omitempty"`
	// contains filtered or unexported fields
}

func GetClusterIdentity

func GetClusterIdentity(ctx actor.ExtensionContext) *ClusterIdentity

func NewClusterIdentity

func NewClusterIdentity(identity string, kind string) *ClusterIdentity

func (*ClusterIdentity) AsKey

func (ci *ClusterIdentity) AsKey() string

func (*ClusterIdentity) Descriptor deprecated

func (*ClusterIdentity) Descriptor() ([]byte, []int)

Deprecated: Use ClusterIdentity.ProtoReflect.Descriptor instead.

func (*ClusterIdentity) ExtensionID

func (ci *ClusterIdentity) ExtensionID() ctxext.ContextExtensionID

func (*ClusterIdentity) GetIdentity

func (x *ClusterIdentity) GetIdentity() string

func (*ClusterIdentity) GetKind

func (x *ClusterIdentity) GetKind() string

func (*ClusterIdentity) ProtoMessage

func (*ClusterIdentity) ProtoMessage()

func (*ClusterIdentity) ProtoReflect

func (x *ClusterIdentity) ProtoReflect() protoreflect.Message

func (*ClusterIdentity) Reset

func (x *ClusterIdentity) Reset()

func (*ClusterIdentity) String

func (x *ClusterIdentity) String() string

func (*ClusterIdentity) ToShortString

func (ci *ClusterIdentity) ToShortString() string

remove

type ClusterInit

type ClusterInit struct {
	Identity *ClusterIdentity
	Cluster  *Cluster
}

type ClusterProvider

type ClusterProvider interface {
	StartMember(cluster *Cluster) error
	StartClient(cluster *Cluster) error
	Shutdown(graceful bool) error
}

type ClusterTopology

type ClusterTopology struct {
	TopologyHash uint64    `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Members      []*Member `protobuf:"bytes,2,rep,name=members,proto3" json:"members,omitempty"`
	Joined       []*Member `protobuf:"bytes,3,rep,name=joined,proto3" json:"joined,omitempty"`
	Left         []*Member `protobuf:"bytes,4,rep,name=left,proto3" json:"left,omitempty"`
	Blocked      []string  `protobuf:"bytes,5,rep,name=blocked,proto3" json:"blocked,omitempty"`
	// contains filtered or unexported fields
}

func (*ClusterTopology) Descriptor deprecated

func (*ClusterTopology) Descriptor() ([]byte, []int)

Deprecated: Use ClusterTopology.ProtoReflect.Descriptor instead.

func (*ClusterTopology) GetBlocked

func (x *ClusterTopology) GetBlocked() []string

func (*ClusterTopology) GetJoined

func (x *ClusterTopology) GetJoined() []*Member

func (*ClusterTopology) GetLeft

func (x *ClusterTopology) GetLeft() []*Member

func (*ClusterTopology) GetMembers

func (x *ClusterTopology) GetMembers() []*Member

func (*ClusterTopology) GetTopologyHash

func (x *ClusterTopology) GetTopologyHash() uint64

func (*ClusterTopology) ProtoMessage

func (*ClusterTopology) ProtoMessage()

func (*ClusterTopology) ProtoReflect

func (x *ClusterTopology) ProtoReflect() protoreflect.Message

func (*ClusterTopology) Reset

func (x *ClusterTopology) Reset()

func (*ClusterTopology) String

func (x *ClusterTopology) String() string

type ClusterTopologyNotification

type ClusterTopologyNotification struct {
	MemberId     string `protobuf:"bytes,1,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"`
	TopologyHash uint32 `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	LeaderId     string `protobuf:"bytes,3,opt,name=leader_id,json=leaderId,proto3" json:"leader_id,omitempty"`
	// contains filtered or unexported fields
}

func (*ClusterTopologyNotification) Descriptor deprecated

func (*ClusterTopologyNotification) Descriptor() ([]byte, []int)

Deprecated: Use ClusterTopologyNotification.ProtoReflect.Descriptor instead.

func (*ClusterTopologyNotification) GetLeaderId

func (x *ClusterTopologyNotification) GetLeaderId() string

func (*ClusterTopologyNotification) GetMemberId

func (x *ClusterTopologyNotification) GetMemberId() string

func (*ClusterTopologyNotification) GetTopologyHash

func (x *ClusterTopologyNotification) GetTopologyHash() uint32

func (*ClusterTopologyNotification) ProtoMessage

func (*ClusterTopologyNotification) ProtoMessage()

func (*ClusterTopologyNotification) ProtoReflect

func (*ClusterTopologyNotification) Reset

func (x *ClusterTopologyNotification) Reset()

func (*ClusterTopologyNotification) String

func (x *ClusterTopologyNotification) String() string

type Config

type Config struct {
	Name                                         string
	Address                                      string
	ClusterProvider                              ClusterProvider
	IdentityLookup                               IdentityLookup
	RemoteConfig                                 *remote.Config
	RequestTimeoutTime                           time.Duration
	RequestsLogThrottlePeriod                    time.Duration
	RequestLog                                   bool
	MaxNumberOfEventsInRequestLogThrottledPeriod int
	ClusterContextProducer                       ContextProducer
	MemberStrategyBuilder                        func(cluster *Cluster, kind string) MemberStrategy
	Kinds                                        map[string]*Kind
	TimeoutTime                                  time.Duration
	GossipInterval                               time.Duration
	GossipRequestTimeout                         time.Duration
	GossipFanOut                                 int
	GossipMaxSend                                int
	HeartbeatExpiration                          time.Duration // Gossip heartbeat timeout. If the member does not update its heartbeat within this period, it will be added to the BlockList
	PubSubConfig                                 *PubSubConfig
}

func Configure

func Configure(clusterName string, clusterProvider ClusterProvider, identityLookup IdentityLookup, remoteConfig *remote.Config, options ...ConfigOption) *Config

func (*Config) ToClusterContextConfig

func (c *Config) ToClusterContextConfig(logger *slog.Logger) *ClusterContextConfig

ToClusterContextConfig converts this cluster Config Context parameters into a valid ClusterContextConfig value and returns a pointer to its memory

type ConfigOption

type ConfigOption func(config *Config)

func WithClusterContextProducer

func WithClusterContextProducer(producer ContextProducer) ConfigOption

WithClusterContextProducer sets the cluster context producer.

func WithHeartbeatExpiration

func WithHeartbeatExpiration(t time.Duration) ConfigOption

WithHeartbeatExpiration sets the gossip heartbeat expiration.

func WithKinds

func WithKinds(kinds ...*Kind) ConfigOption

func WithMaxNumberOfEventsInRequestLogThrottlePeriod

func WithMaxNumberOfEventsInRequestLogThrottlePeriod(maxNumber int) ConfigOption

WithMaxNumberOfEventsInRequestLogThrottlePeriod sets the max number of events in request log throttled period.

func WithPubSubSubscriberTimeout

func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption

WithPubSubSubscriberTimeout sets a timeout used when delivering a message batch to a subscriber. Default is 5s.

func WithRequestLog

func WithRequestLog(enabled bool) ConfigOption

func WithRequestTimeout

func WithRequestTimeout(t time.Duration) ConfigOption

WithRequestTimeout sets the request timeout.

func WithRequestsLogThrottlePeriod

func WithRequestsLogThrottlePeriod(period time.Duration) ConfigOption

WithRequestsLogThrottlePeriod sets the requests log throttle period.

type ConsensusCheck

type ConsensusCheck struct {
	// contains filtered or unexported fields
}

data structure helpful to store consensus check information and behavior.

func NewConsensusCheck

func NewConsensusCheck(affectedKeys []string, check GossipUpdater) ConsensusCheck

creates a new ConsensusCheck value with the given data and return it back.

type ConsensusCheckBuilder

type ConsensusCheckBuilder struct {
	// contains filtered or unexported fields
}

func NewConsensusCheckBuilder

func NewConsensusCheckBuilder(logger *slog.Logger, key string, getValue func(*anypb.Any) interface{}) *ConsensusCheckBuilder

func (*ConsensusCheckBuilder) AffectedKeys

func (ccb *ConsensusCheckBuilder) AffectedKeys() []string

func (*ConsensusCheckBuilder) Build

Build builds a new ConsensusHandler and ConsensusCheck values and returns pointers to them

func (*ConsensusCheckBuilder) Check

func (*ConsensusCheckBuilder) HasConsensus

func (ccb *ConsensusCheckBuilder) HasConsensus(memberValues []*consensusMemberValue) (bool, uint64)

func (*ConsensusCheckBuilder) MapToValue

func (ccb *ConsensusCheckBuilder) MapToValue(valueTuple *consensusValue) func(string, *GossipMemberState) (string, string, uint64)

type ConsensusCheckDefinition

type ConsensusCheckDefinition interface {
	Check() *ConsensusCheck
	AffectedKeys() map[string]struct{}
}

type ConsensusChecker

type ConsensusChecker func(*GossipState, map[string]empty) (bool, interface{})

ConsensusChecker Customary type used to provide consensus check callbacks of any type note: this is equivalent to (for future go v1.18):

type ConsensusChecker[T] func(GossipState, map[string]empty) (bool, T)

type ConsensusChecks

type ConsensusChecks struct {
	// contains filtered or unexported fields
}

acts as a storage of pointers to ConsensusCheck stored by key.

func NewConsensusChecks

func NewConsensusChecks() *ConsensusChecks

creates a new ConsensusChecks value and returns a pointer to it.

func (*ConsensusChecks) Add

func (cc *ConsensusChecks) Add(id string, check *ConsensusCheck)

adds a new pointer to a ConsensusCheck value in the storage and registers its affected by keys index.

func (*ConsensusChecks) GetByUpdatedKey

func (cc *ConsensusChecks) GetByUpdatedKey(key string) []*ConsensusCheck

iterates over all the keys stored in the set (map[string]empty) found in the given key map and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given key.

func (*ConsensusChecks) GetByUpdatedKeys

func (cc *ConsensusChecks) GetByUpdatedKeys(keys []string) []*ConsensusCheck

iterate over all the keys stored in the set (map[string]empty) found in the given key maps and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given keys with removed duplicates on it (as it is a "set").

func (*ConsensusChecks) Remove

func (cc *ConsensusChecks) Remove(id string)

Remove removes the given ConsensusCheck identity from the storage and removes its affected by keys index if needed after cleaning.

type ConsensusHandler

type ConsensusHandler interface {
	GetID() string
	TryGetConsensus(context.Context) (interface{}, bool)
}

type Context

type Context interface {
	Request(identity string, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error)
	RequestFuture(identity string, kind string, message interface{}, opts ...GrainCallOption) (*actor.Future, error)
}

Context is an interface any cluster context needs to implement

type ContextProducer

type ContextProducer func(*Cluster) Context

Defines a type to provide DefaultContext configurations / implementations.

type DefaultContext

type DefaultContext struct {
	// contains filtered or unexported fields
}

Defines a default cluster context hashBytes structure.

func (*DefaultContext) Request

func (dcc *DefaultContext) Request(identity, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error)

func (*DefaultContext) RequestFuture

func (dcc *DefaultContext) RequestFuture(identity string, kind string, message interface{}, opts ...GrainCallOption) (*actor.Future, error)

type DeliverBatchRequest

type DeliverBatchRequest struct {
	Subscribers *Subscribers
	PubSubBatch *PubSubBatch
	Topic       string
}

func (*DeliverBatchRequest) Serialize

func (d *DeliverBatchRequest) Serialize() (remote.RootSerialized, error)

type DeliverBatchRequestTransport

type DeliverBatchRequestTransport struct {
	Subscribers *Subscribers          `protobuf:"bytes,1,opt,name=subscribers,proto3" json:"subscribers,omitempty"`
	Batch       *PubSubBatchTransport `protobuf:"bytes,2,opt,name=batch,proto3" json:"batch,omitempty"`
	Topic       string                `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
	// contains filtered or unexported fields
}

Message sent from topic to delivery actor

func (*DeliverBatchRequestTransport) Descriptor deprecated

func (*DeliverBatchRequestTransport) Descriptor() ([]byte, []int)

Deprecated: Use DeliverBatchRequestTransport.ProtoReflect.Descriptor instead.

func (*DeliverBatchRequestTransport) Deserialize

func (*DeliverBatchRequestTransport) GetBatch

func (*DeliverBatchRequestTransport) GetSubscribers

func (x *DeliverBatchRequestTransport) GetSubscribers() *Subscribers

func (*DeliverBatchRequestTransport) GetTopic

func (x *DeliverBatchRequestTransport) GetTopic() string

func (*DeliverBatchRequestTransport) ProtoMessage

func (*DeliverBatchRequestTransport) ProtoMessage()

func (*DeliverBatchRequestTransport) ProtoReflect

func (*DeliverBatchRequestTransport) Reset

func (x *DeliverBatchRequestTransport) Reset()

func (*DeliverBatchRequestTransport) String

type DeliveryStatus

type DeliveryStatus int32

Delivery status as seen by the delivery actor

const (
	// Message was put in the queue of the subscriber
	DeliveryStatus_Delivered DeliveryStatus = 0
	// Message did not reach subscriber, because it was dead
	DeliveryStatus_SubscriberNoLongerReachable DeliveryStatus = 1
	// Delivery timed out
	DeliveryStatus_Timeout DeliveryStatus = 2
	// Some other problem happened
	DeliveryStatus_OtherError DeliveryStatus = 127
)

func (DeliveryStatus) Descriptor

func (DeliveryStatus) Enum

func (x DeliveryStatus) Enum() *DeliveryStatus

func (DeliveryStatus) EnumDescriptor deprecated

func (DeliveryStatus) EnumDescriptor() ([]byte, []int)

Deprecated: Use DeliveryStatus.Descriptor instead.

func (DeliveryStatus) Number

func (DeliveryStatus) String

func (x DeliveryStatus) String() string

func (DeliveryStatus) Type

type EmptyKeyValueStore

type EmptyKeyValueStore[T any] struct{}

EmptyKeyValueStore is a key value store that does nothing.

func (*EmptyKeyValueStore[T]) Clear

func (e *EmptyKeyValueStore[T]) Clear(_ context.Context, _ string) error

func (*EmptyKeyValueStore[T]) Get

func (e *EmptyKeyValueStore[T]) Get(_ context.Context, _ string) (T, error)

func (*EmptyKeyValueStore[T]) Set

func (e *EmptyKeyValueStore[T]) Set(_ context.Context, _ string, _ T) error

type GetGossipStateRequest

type GetGossipStateRequest struct {
	Key string
}

Used to query the GossipActor about a given key status

func NewGetGossipStateRequest

func NewGetGossipStateRequest(key string) GetGossipStateRequest

Create a new GetGossipStateRequest value and return it back

type GetGossipStateResponse

type GetGossipStateResponse struct {
	State map[string]*GossipKeyValue
}

Used by the GossipActor to send back the status value of a given key

func NewGetGossipStateResponse

func NewGetGossipStateResponse(state map[string]*GossipKeyValue) GetGossipStateResponse

type GetPid

type GetPid struct {
	ClusterIdentity *ClusterIdentity
}

GetPid contains

type Gossip

The Gossip interface must be implemented by any value that pretends to participate with-in the Gossip protocol

type GossipActor

type GossipActor struct {
	// contains filtered or unexported fields
}

Actor used to send gossip messages around

func NewGossipActor

func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers func() set.Set[string], fanOut int, maxSend int, system *actor.ActorSystem) *GossipActor

Creates a new GossipActor and returns a pointer to its location in the heap

func (*GossipActor) Receive

func (ga *GossipActor) Receive(ctx actor.Context)

Receive method.

func (*GossipActor) ReceiveState

func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)

type GossipConsensusChecker

type GossipConsensusChecker interface {
	AddConsensusCheck(id string, check *ConsensusCheck)
	RemoveConsensusCheck(id string)
}

This interface must be implemented by any value that wants to add or remove consensus checkers

type GossipCore

type GossipCore interface {
	UpdateClusterTopology(topology *ClusterTopology)
	ReceiveState(remoteState *GossipState) []*GossipUpdate
	SendState(sendStateToMember LocalStateSender)
	GetMemberStateDelta(targetMemberID string) *MemberStateDelta
}

This interface must be implemented by any value that wants to react to cluster topology events

type GossipDeltaValue

type GossipDeltaValue struct {
	Entries []*GossipDeltaValue_GossipDeltaEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"`
	// contains filtered or unexported fields
}

represents a value that can be sent in form of a delta change instead of a full value replace

func (*GossipDeltaValue) Descriptor deprecated

func (*GossipDeltaValue) Descriptor() ([]byte, []int)

Deprecated: Use GossipDeltaValue.ProtoReflect.Descriptor instead.

func (*GossipDeltaValue) GetEntries

func (*GossipDeltaValue) ProtoMessage

func (*GossipDeltaValue) ProtoMessage()

func (*GossipDeltaValue) ProtoReflect

func (x *GossipDeltaValue) ProtoReflect() protoreflect.Message

func (*GossipDeltaValue) Reset

func (x *GossipDeltaValue) Reset()

func (*GossipDeltaValue) String

func (x *GossipDeltaValue) String() string

type GossipDeltaValue_GossipDeltaEntry

type GossipDeltaValue_GossipDeltaEntry struct {
	SequenceNumber int64  `protobuf:"varint,1,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"`
	Data           []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

these are the entries of a delta value this can be seen as an array with data, where each element in the array is tagged with a sequence number

func (*GossipDeltaValue_GossipDeltaEntry) Descriptor deprecated

func (*GossipDeltaValue_GossipDeltaEntry) Descriptor() ([]byte, []int)

Deprecated: Use GossipDeltaValue_GossipDeltaEntry.ProtoReflect.Descriptor instead.

func (*GossipDeltaValue_GossipDeltaEntry) GetData

func (x *GossipDeltaValue_GossipDeltaEntry) GetData() []byte

func (*GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber

func (x *GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber() int64

func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage

func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage()

func (*GossipDeltaValue_GossipDeltaEntry) ProtoReflect

func (*GossipDeltaValue_GossipDeltaEntry) Reset

func (*GossipDeltaValue_GossipDeltaEntry) String

type GossipKeyValue

type GossipKeyValue struct {
	SequenceNumber                 int64      `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` //version is local to the owner member
	Value                          *anypb.Any `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"`                                          //value is any format
	LocalTimestampUnixMilliseconds int64      ``                                                                                                         /* 156-byte string literal not displayed */
	// contains filtered or unexported fields
}

a known key might be heartbeat. if we locally tag each entry with a local timestamp this means that we can measure if we have not received a new heartbeat from one member in some time even if we don't know the exact time the heartbeat was issued, due to clock differences. we still know when _we_ as in this node, got this data. and we can measure time from then til now.

if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead

func (*GossipKeyValue) Descriptor deprecated

func (*GossipKeyValue) Descriptor() ([]byte, []int)

Deprecated: Use GossipKeyValue.ProtoReflect.Descriptor instead.

func (*GossipKeyValue) GetLocalTimestampUnixMilliseconds

func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64

func (*GossipKeyValue) GetSequenceNumber

func (x *GossipKeyValue) GetSequenceNumber() int64

func (*GossipKeyValue) GetValue

func (x *GossipKeyValue) GetValue() *anypb.Any

func (*GossipKeyValue) ProtoMessage

func (*GossipKeyValue) ProtoMessage()

func (*GossipKeyValue) ProtoReflect

func (x *GossipKeyValue) ProtoReflect() protoreflect.Message

func (*GossipKeyValue) Reset

func (x *GossipKeyValue) Reset()

func (*GossipKeyValue) String

func (x *GossipKeyValue) String() string

type GossipKeyValues

type GossipKeyValues = map[string]*GossipKeyValue

type GossipMemberState

type GossipMemberState = GossipState_GossipMemberState

convenience type alias

type GossipMemberStates

type GossipMemberStates = map[string]*GossipMemberState

type GossipRequest

type GossipRequest struct {
	MemberId string       `protobuf:"bytes,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"`
	State    *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
	// contains filtered or unexported fields
}

func (*GossipRequest) Descriptor deprecated

func (*GossipRequest) Descriptor() ([]byte, []int)

Deprecated: Use GossipRequest.ProtoReflect.Descriptor instead.

func (*GossipRequest) GetMemberId

func (x *GossipRequest) GetMemberId() string

func (*GossipRequest) GetState

func (x *GossipRequest) GetState() *GossipState

func (*GossipRequest) ProtoMessage

func (*GossipRequest) ProtoMessage()

func (*GossipRequest) ProtoReflect

func (x *GossipRequest) ProtoReflect() protoreflect.Message

func (*GossipRequest) Reset

func (x *GossipRequest) Reset()

func (*GossipRequest) String

func (x *GossipRequest) String() string

type GossipResponse

type GossipResponse struct {
	State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
	// contains filtered or unexported fields
}

Ack a gossip request

func (*GossipResponse) Descriptor deprecated

func (*GossipResponse) Descriptor() ([]byte, []int)

Deprecated: Use GossipResponse.ProtoReflect.Descriptor instead.

func (*GossipResponse) GetState

func (x *GossipResponse) GetState() *GossipState

func (*GossipResponse) ProtoMessage

func (*GossipResponse) ProtoMessage()

func (*GossipResponse) ProtoReflect

func (x *GossipResponse) ProtoReflect() protoreflect.Message

func (*GossipResponse) Reset

func (x *GossipResponse) Reset()

func (*GossipResponse) String

func (x *GossipResponse) String() string

type GossipState

type GossipState struct {
	Members map[string]*GossipState_GossipMemberState `` /* 155-byte string literal not displayed */
	// contains filtered or unexported fields
}

two GossipState objects can be merged key + member_id gets it's own entry, if collision, highest version is selected

func (*GossipState) Descriptor deprecated

func (*GossipState) Descriptor() ([]byte, []int)

Deprecated: Use GossipState.ProtoReflect.Descriptor instead.

func (*GossipState) GetMembers

func (x *GossipState) GetMembers() map[string]*GossipState_GossipMemberState

func (*GossipState) ProtoMessage

func (*GossipState) ProtoMessage()

func (*GossipState) ProtoReflect

func (x *GossipState) ProtoReflect() protoreflect.Message

func (*GossipState) Reset

func (x *GossipState) Reset()

func (*GossipState) String

func (x *GossipState) String() string

type GossipStateStorer

type GossipStateStorer interface {
	GetState(key string) map[string]*GossipKeyValue
	SetState(key string, value proto.Message)
}

This interface must be implemented by any value that. wants to be used as a gossip state storage

type GossipState_GossipMemberState

type GossipState_GossipMemberState struct {
	Values map[string]*GossipKeyValue `` /* 153-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*GossipState_GossipMemberState) Descriptor deprecated

func (*GossipState_GossipMemberState) Descriptor() ([]byte, []int)

Deprecated: Use GossipState_GossipMemberState.ProtoReflect.Descriptor instead.

func (*GossipState_GossipMemberState) GetValues

func (*GossipState_GossipMemberState) ProtoMessage

func (*GossipState_GossipMemberState) ProtoMessage()

func (*GossipState_GossipMemberState) ProtoReflect

func (*GossipState_GossipMemberState) Reset

func (x *GossipState_GossipMemberState) Reset()

func (*GossipState_GossipMemberState) String

type GossipUpdate

type GossipUpdate struct {
	MemberID, Key string
	Value         *anypb.Any
	SeqNumber     int64
}

GossipUpdate Used to update gossip data when a ClusterTopology event occurs

type GossipUpdater

type GossipUpdater func(*GossipState, map[string]empty)

type Gossiper

type Gossiper struct {
	// The Gossiper Actor Name, defaults to "gossip"
	GossipActorName string
	// contains filtered or unexported fields
}

The Gossiper data structure manages Gossip

func (*Gossiper) GetState

func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error)

func (*Gossiper) RegisterConsensusCheck

func (g *Gossiper) RegisterConsensusCheck(key string, getValue func(*anypb.Any) interface{}) ConsensusHandler

RegisterConsensusCheck Builds a consensus handler and a consensus checker, send the checker to the Gossip actor and returns the handler back to the caller

func (*Gossiper) SendState

func (g *Gossiper) SendState()

func (*Gossiper) SetState

func (g *Gossiper) SetState(key string, value proto.Message)

SetState Sends fire and forget message to update member state

func (*Gossiper) SetStateRequest

func (g *Gossiper) SetStateRequest(key string, value proto.Message) error

SetStateRequest Sends a Request (that blocks) to update member state

func (*Gossiper) Shutdown

func (g *Gossiper) Shutdown()

func (*Gossiper) StartGossiping

func (g *Gossiper) StartGossiping() error

type GrainCallConfig

type GrainCallConfig struct {
	RetryCount  int
	Timeout     time.Duration
	RetryAction func(n int) int
	Context     actor.SenderContext
}

func DefaultGrainCallConfig

func DefaultGrainCallConfig(cluster *Cluster) *GrainCallConfig

func NewGrainCallOptions

func NewGrainCallOptions(cluster *Cluster) *GrainCallConfig

type GrainCallOption

type GrainCallOption func(config *GrainCallConfig)

func WithContext

func WithContext(ctx actor.SenderContext) GrainCallOption

func WithRetryAction

func WithRetryAction(act func(i int) int) GrainCallOption

func WithRetryCount

func WithRetryCount(count int) GrainCallOption

func WithTimeout

func WithTimeout(timeout time.Duration) GrainCallOption

type GrainContext

type GrainContext interface {
	actor.Context

	Identity() string
	Kind() string
	Cluster() *Cluster
}

func NewGrainContext

func NewGrainContext(context actor.Context, identity *ClusterIdentity, cluster *Cluster) GrainContext

type GrainErrorResponse

type GrainErrorResponse struct {
	Reason   string            `protobuf:"bytes,1,opt,name=reason,proto3" json:"reason,omitempty"`
	Message  string            `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
	Metadata map[string]string `` /* 157-byte string literal not displayed */
	// contains filtered or unexported fields
}

func FromError

func FromError(err error) *GrainErrorResponse

func NewGrainErrorResponse

func NewGrainErrorResponse(reason, message string) *GrainErrorResponse

func NewGrainErrorResponsef

func NewGrainErrorResponsef(reason, format string, args ...interface{}) *GrainErrorResponse

func (*GrainErrorResponse) Descriptor deprecated

func (*GrainErrorResponse) Descriptor() ([]byte, []int)

Deprecated: Use GrainErrorResponse.ProtoReflect.Descriptor instead.

func (*GrainErrorResponse) Error

func (m *GrainErrorResponse) Error() string

func (*GrainErrorResponse) Errorf

func (m *GrainErrorResponse) Errorf(format string, args ...interface{}) error

func (*GrainErrorResponse) GetMessage

func (x *GrainErrorResponse) GetMessage() string

func (*GrainErrorResponse) GetMetadata

func (x *GrainErrorResponse) GetMetadata() map[string]string

func (*GrainErrorResponse) GetReason

func (x *GrainErrorResponse) GetReason() string

func (*GrainErrorResponse) Is

func (m *GrainErrorResponse) Is(err error) bool

func (*GrainErrorResponse) ProtoMessage

func (*GrainErrorResponse) ProtoMessage()

func (*GrainErrorResponse) ProtoReflect

func (x *GrainErrorResponse) ProtoReflect() protoreflect.Message

func (*GrainErrorResponse) Reset

func (x *GrainErrorResponse) Reset()

func (*GrainErrorResponse) String

func (x *GrainErrorResponse) String() string

func (*GrainErrorResponse) WithMetadata

func (m *GrainErrorResponse) WithMetadata(metadata map[string]string) *GrainErrorResponse

type GrainRequest

type GrainRequest struct {
	MethodIndex     int32  `protobuf:"varint,1,opt,name=method_index,json=methodIndex,proto3" json:"method_index,omitempty"`
	MessageData     []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	MessageTypeName string `protobuf:"bytes,3,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"`
	// contains filtered or unexported fields
}

func (*GrainRequest) Descriptor deprecated

func (*GrainRequest) Descriptor() ([]byte, []int)

Deprecated: Use GrainRequest.ProtoReflect.Descriptor instead.

func (*GrainRequest) GetMessageData

func (x *GrainRequest) GetMessageData() []byte

func (*GrainRequest) GetMessageTypeName

func (x *GrainRequest) GetMessageTypeName() string

func (*GrainRequest) GetMethodIndex

func (x *GrainRequest) GetMethodIndex() int32

func (*GrainRequest) ProtoMessage

func (*GrainRequest) ProtoMessage()

func (*GrainRequest) ProtoReflect

func (x *GrainRequest) ProtoReflect() protoreflect.Message

func (*GrainRequest) Reset

func (x *GrainRequest) Reset()

func (*GrainRequest) String

func (x *GrainRequest) String() string

type GrainResponse

type GrainResponse struct {
	MessageData     []byte `protobuf:"bytes,1,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	MessageTypeName string `protobuf:"bytes,2,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"`
	// contains filtered or unexported fields
}

func (*GrainResponse) Descriptor deprecated

func (*GrainResponse) Descriptor() ([]byte, []int)

Deprecated: Use GrainResponse.ProtoReflect.Descriptor instead.

func (*GrainResponse) GetMessageData

func (x *GrainResponse) GetMessageData() []byte

func (*GrainResponse) GetMessageTypeName

func (x *GrainResponse) GetMessageTypeName() string

func (*GrainResponse) ProtoMessage

func (*GrainResponse) ProtoMessage()

func (*GrainResponse) ProtoReflect

func (x *GrainResponse) ProtoReflect() protoreflect.Message

func (*GrainResponse) Reset

func (x *GrainResponse) Reset()

func (*GrainResponse) String

func (x *GrainResponse) String() string

type IdentityHandover

type IdentityHandover struct {
	Actors       []*Activation `protobuf:"bytes,1,rep,name=actors,proto3" json:"actors,omitempty"`
	ChunkId      int32         `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"`
	Final        bool          `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"`
	TopologyHash uint64        `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Skipped      int32         `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"` // Total number of activations skipped
	Sent         int32         `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"`       // Total number of activations sent
	// contains filtered or unexported fields
}

func (*IdentityHandover) Descriptor deprecated

func (*IdentityHandover) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandover.ProtoReflect.Descriptor instead.

func (*IdentityHandover) GetActors

func (x *IdentityHandover) GetActors() []*Activation

func (*IdentityHandover) GetChunkId

func (x *IdentityHandover) GetChunkId() int32

func (*IdentityHandover) GetFinal

func (x *IdentityHandover) GetFinal() bool

func (*IdentityHandover) GetSent

func (x *IdentityHandover) GetSent() int32

func (*IdentityHandover) GetSkipped

func (x *IdentityHandover) GetSkipped() int32

func (*IdentityHandover) GetTopologyHash

func (x *IdentityHandover) GetTopologyHash() uint64

func (*IdentityHandover) ProtoMessage

func (*IdentityHandover) ProtoMessage()

func (*IdentityHandover) ProtoReflect

func (x *IdentityHandover) ProtoReflect() protoreflect.Message

func (*IdentityHandover) Reset

func (x *IdentityHandover) Reset()

func (*IdentityHandover) String

func (x *IdentityHandover) String() string

type IdentityHandoverAck

type IdentityHandoverAck struct {
	ChunkId         int32                     `protobuf:"varint,1,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"`
	TopologyHash    uint64                    `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	ProcessingState IdentityHandoverAck_State `` /* 146-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*IdentityHandoverAck) Descriptor deprecated

func (*IdentityHandoverAck) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverAck.ProtoReflect.Descriptor instead.

func (*IdentityHandoverAck) GetChunkId

func (x *IdentityHandoverAck) GetChunkId() int32

func (*IdentityHandoverAck) GetProcessingState

func (x *IdentityHandoverAck) GetProcessingState() IdentityHandoverAck_State

func (*IdentityHandoverAck) GetTopologyHash

func (x *IdentityHandoverAck) GetTopologyHash() uint64

func (*IdentityHandoverAck) ProtoMessage

func (*IdentityHandoverAck) ProtoMessage()

func (*IdentityHandoverAck) ProtoReflect

func (x *IdentityHandoverAck) ProtoReflect() protoreflect.Message

func (*IdentityHandoverAck) Reset

func (x *IdentityHandoverAck) Reset()

func (*IdentityHandoverAck) String

func (x *IdentityHandoverAck) String() string

type IdentityHandoverAck_State

type IdentityHandoverAck_State int32
const (
	IdentityHandoverAck_processed          IdentityHandoverAck_State = 0
	IdentityHandoverAck_incorrect_topology IdentityHandoverAck_State = 1
)

func (IdentityHandoverAck_State) Descriptor

func (IdentityHandoverAck_State) Enum

func (IdentityHandoverAck_State) EnumDescriptor deprecated

func (IdentityHandoverAck_State) EnumDescriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverAck_State.Descriptor instead.

func (IdentityHandoverAck_State) Number

func (IdentityHandoverAck_State) String

func (x IdentityHandoverAck_State) String() string

func (IdentityHandoverAck_State) Type

type IdentityHandoverRequest

type IdentityHandoverRequest struct {
	CurrentTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,1,opt,name=current_topology,json=currentTopology,proto3" json:"current_topology,omitempty"`
	Address         string                            `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
	// If the requester passes a delta topology, only return activations which would not be assigned to the member
	// in the previous topology.
	DeltaTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,3,opt,name=delta_topology,json=deltaTopology,proto3" json:"delta_topology,omitempty"`
	// contains filtered or unexported fields
}

request response call from Identity actor sent to each member asking what activations they hold that belong to the requester

func (*IdentityHandoverRequest) Descriptor deprecated

func (*IdentityHandoverRequest) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverRequest.ProtoReflect.Descriptor instead.

func (*IdentityHandoverRequest) GetAddress

func (x *IdentityHandoverRequest) GetAddress() string

func (*IdentityHandoverRequest) GetCurrentTopology

func (*IdentityHandoverRequest) GetDeltaTopology

func (*IdentityHandoverRequest) ProtoMessage

func (*IdentityHandoverRequest) ProtoMessage()

func (*IdentityHandoverRequest) ProtoReflect

func (x *IdentityHandoverRequest) ProtoReflect() protoreflect.Message

func (*IdentityHandoverRequest) Reset

func (x *IdentityHandoverRequest) Reset()

func (*IdentityHandoverRequest) String

func (x *IdentityHandoverRequest) String() string

type IdentityHandoverRequest_Topology

type IdentityHandoverRequest_Topology struct {
	TopologyHash uint64    `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Members      []*Member `protobuf:"bytes,3,rep,name=members,proto3" json:"members,omitempty"`
	// contains filtered or unexported fields
}

func (*IdentityHandoverRequest_Topology) Descriptor deprecated

func (*IdentityHandoverRequest_Topology) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverRequest_Topology.ProtoReflect.Descriptor instead.

func (*IdentityHandoverRequest_Topology) GetMembers

func (x *IdentityHandoverRequest_Topology) GetMembers() []*Member

func (*IdentityHandoverRequest_Topology) GetTopologyHash

func (x *IdentityHandoverRequest_Topology) GetTopologyHash() uint64

func (*IdentityHandoverRequest_Topology) ProtoMessage

func (*IdentityHandoverRequest_Topology) ProtoMessage()

func (*IdentityHandoverRequest_Topology) ProtoReflect

func (*IdentityHandoverRequest_Topology) Reset

func (*IdentityHandoverRequest_Topology) String

type IdentityLookup

type IdentityLookup interface {
	Get(clusterIdentity *ClusterIdentity) *actor.PID

	RemovePid(clusterIdentity *ClusterIdentity, pid *actor.PID)

	Setup(cluster *Cluster, kinds []string, isClient bool)

	Shutdown()
}

IdentityLookup contains

type IdentityStorageLookup

type IdentityStorageLookup struct {
	Storage StorageLookup
	// contains filtered or unexported fields
}

IdentityStorageLookup contains

func (*IdentityStorageLookup) Get

func (id *IdentityStorageLookup) Get(clusterIdentity *ClusterIdentity) *actor.PID

Get returns a PID for a given ClusterIdentity

func (*IdentityStorageLookup) RemoveMember

func (i *IdentityStorageLookup) RemoveMember(memberID string)

RemoveMember from identity storage

func (*IdentityStorageLookup) Setup

func (id *IdentityStorageLookup) Setup(cluster *Cluster, kinds []string, isClient bool)

type IdentityStorageWorker

type IdentityStorageWorker struct {
	// contains filtered or unexported fields
}

func (*IdentityStorageWorker) Receive

func (ids *IdentityStorageWorker) Receive(c actor.Context)

Receive func

type Informer

type Informer struct {
	// contains filtered or unexported fields
}

The Informer data structure implements the Gossip interface

func (*Informer) AddConsensusCheck

func (inf *Informer) AddConsensusCheck(id string, check *ConsensusCheck)

adds a new consensus checker to this informer

func (*Informer) CheckConsensus

func (inf *Informer) CheckConsensus(updatedKeys ...string)

check consensus for the given keys

func (*Informer) GetMemberStateDelta

func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelta

func (*Informer) GetState

func (inf *Informer) GetState(key string) map[string]*GossipKeyValue

retrieves this informer current state for the given key returns map containing each known member id and their value

func (*Informer) ReceiveState

func (inf *Informer) ReceiveState(remoteState *GossipState) []*GossipUpdate

receives a remote informer state

func (*Informer) RemoveConsensusCheck

func (inf *Informer) RemoveConsensusCheck(id string)

removes a consensus checker from this informer

func (*Informer) SendState

func (inf *Informer) SendState(sendStateToMember LocalStateSender)

sends this informer local state to remote informers chosen randomly from the slice of other members known by this informer until gossipFanOut number of sent has been reached

func (*Informer) SetState

func (inf *Informer) SetState(key string, message proto.Message)

sets new update key state using the given proto message

func (*Informer) UpdateClusterTopology

func (inf *Informer) UpdateClusterTopology(topology *ClusterTopology)

called when there is a cluster topology update

type Initialize

type Initialize struct {
	IdleTimeout *durationpb.Duration `protobuf:"bytes,1,opt,name=idleTimeout,proto3" json:"idleTimeout,omitempty"`
	// contains filtered or unexported fields
}

First request to initialize the actor.

func (*Initialize) Descriptor deprecated

func (*Initialize) Descriptor() ([]byte, []int)

Deprecated: Use Initialize.ProtoReflect.Descriptor instead.

func (*Initialize) GetIdleTimeout

func (x *Initialize) GetIdleTimeout() *durationpb.Duration

func (*Initialize) ProtoMessage

func (*Initialize) ProtoMessage()

func (*Initialize) ProtoReflect

func (x *Initialize) ProtoReflect() protoreflect.Message

func (*Initialize) Reset

func (x *Initialize) Reset()

func (*Initialize) String

func (x *Initialize) String() string

type InvalidOperationException

type InvalidOperationException struct {
	Topic string
}

func (*InvalidOperationException) Error

func (i *InvalidOperationException) Error() string

func (*InvalidOperationException) Is

type KeyValueStore

type KeyValueStore[T any] interface {
	// Set the value for the given key.
	Set(ctx context.Context, key string, value T) error
	// Get the value for the given key..
	Get(ctx context.Context, key string) (T, error)
	// Clear the value for the given key.
	Clear(ctx context.Context, key string) error
}

KeyValueStore is a distributed key value store

type Kind

type Kind struct {
	Kind            string
	Props           *actor.Props
	StrategyBuilder func(*Cluster) MemberStrategy
}

Kind represents the kinds of actors a cluster can manage

func NewKind

func NewKind(kind string, props *actor.Props) *Kind

NewKind creates a new instance of a kind

func (*Kind) Build

func (k *Kind) Build(cluster *Cluster) *ActivatedKind

func (*Kind) WithMemberStrategy

func (k *Kind) WithMemberStrategy(strategyBuilder func(*Cluster) MemberStrategy)

type LocalStateSender

type LocalStateSender func(memberStateDelta *MemberStateDelta, member *Member)

customary type that defines a states sender callback.

type Member

type Member struct {
	Host  string   `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
	Port  int32    `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"`
	Id    string   `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
	Kinds []string `protobuf:"bytes,4,rep,name=kinds,proto3" json:"kinds,omitempty"`
	// contains filtered or unexported fields
}

func (*Member) Address

func (m *Member) Address() string

Address return a "host:port". Member defined by protos.proto

func (*Member) Descriptor deprecated

func (*Member) Descriptor() ([]byte, []int)

Deprecated: Use Member.ProtoReflect.Descriptor instead.

func (*Member) GetHost

func (x *Member) GetHost() string

func (*Member) GetId

func (x *Member) GetId() string

func (*Member) GetKinds

func (x *Member) GetKinds() []string

func (*Member) GetPort

func (x *Member) GetPort() int32

func (*Member) HasKind

func (m *Member) HasKind(kind string) bool

func (*Member) ProtoMessage

func (*Member) ProtoMessage()

func (*Member) ProtoReflect

func (x *Member) ProtoReflect() protoreflect.Message

func (*Member) Reset

func (x *Member) Reset()

func (*Member) String

func (x *Member) String() string

type MemberAvailableEvent

type MemberAvailableEvent struct {
	MemberMeta
}

func (*MemberAvailableEvent) MemberStatusEvent

func (*MemberAvailableEvent) MemberStatusEvent()

type MemberHeartbeat

type MemberHeartbeat struct {
	ActorStatistics *ActorStatistics `protobuf:"bytes,1,opt,name=actor_statistics,json=actorStatistics,proto3" json:"actor_statistics,omitempty"`
	// contains filtered or unexported fields
}

func (*MemberHeartbeat) Descriptor deprecated

func (*MemberHeartbeat) Descriptor() ([]byte, []int)

Deprecated: Use MemberHeartbeat.ProtoReflect.Descriptor instead.

func (*MemberHeartbeat) GetActorStatistics

func (x *MemberHeartbeat) GetActorStatistics() *ActorStatistics

func (*MemberHeartbeat) ProtoMessage

func (*MemberHeartbeat) ProtoMessage()

func (*MemberHeartbeat) ProtoReflect

func (x *MemberHeartbeat) ProtoReflect() protoreflect.Message

func (*MemberHeartbeat) Reset

func (x *MemberHeartbeat) Reset()

func (*MemberHeartbeat) String

func (x *MemberHeartbeat) String() string

type MemberJoinedEvent

type MemberJoinedEvent struct {
	MemberMeta
}

func (*MemberJoinedEvent) MemberStatusEvent

func (*MemberJoinedEvent) MemberStatusEvent()

type MemberLeftEvent

type MemberLeftEvent struct {
	MemberMeta
}

func (*MemberLeftEvent) MemberStatusEvent

func (*MemberLeftEvent) MemberStatusEvent()

type MemberList

type MemberList struct {
	// contains filtered or unexported fields
}

MemberList is responsible to keep track of the current cluster topology it does so by listening to changes from the ClusterProvider. the default ClusterProvider is consul.ConsulProvider which uses the Consul HTTP API to scan for changes

func NewMemberList

func NewMemberList(cluster *Cluster) *MemberList

func (*MemberList) BroadcastEvent

func (ml *MemberList) BroadcastEvent(message interface{}, includeSelf bool)

func (*MemberList) ContainsMemberID

func (ml *MemberList) ContainsMemberID(memberID string) bool

func (*MemberList) GetActivatorMember

func (ml *MemberList) GetActivatorMember(kind string, requestSourceAddress string) string

func (*MemberList) InitializeTopologyConsensus

func (ml *MemberList) InitializeTopologyConsensus()

func (*MemberList) Length

func (ml *MemberList) Length() int

func (*MemberList) Members

func (ml *MemberList) Members() *MemberSet

func (*MemberList) TerminateMember

func (ml *MemberList) TerminateMember(m *Member)

func (*MemberList) TopologyConsensus

func (ml *MemberList) TopologyConsensus(ctx context.Context) (uint64, bool)

func (*MemberList) UpdateClusterTopology

func (ml *MemberList) UpdateClusterTopology(members Members)

type MemberMeta

type MemberMeta struct {
	Host  string
	Port  int
	Kinds []string
}

func (*MemberMeta) GetKinds

func (e *MemberMeta) GetKinds() []string

func (*MemberMeta) Name

func (e *MemberMeta) Name() string

type MemberRejoinedEvent

type MemberRejoinedEvent struct {
	MemberMeta
}

func (*MemberRejoinedEvent) MemberStatusEvent

func (*MemberRejoinedEvent) MemberStatusEvent()

type MemberSet

type MemberSet struct {
	// contains filtered or unexported fields
}

func NewMemberSet

func NewMemberSet(members Members) *MemberSet

func (*MemberSet) ContainsID

func (ms *MemberSet) ContainsID(id string) bool

func (*MemberSet) Equals

func (ms *MemberSet) Equals(other *MemberSet) bool

func (*MemberSet) Except

func (ms *MemberSet) Except(other *MemberSet) *MemberSet

func (*MemberSet) ExceptIds

func (ms *MemberSet) ExceptIds(ids []string) *MemberSet

func (*MemberSet) GetMemberById

func (ms *MemberSet) GetMemberById(id string) *Member

func (*MemberSet) Len

func (ms *MemberSet) Len() int

func (*MemberSet) Members

func (ms *MemberSet) Members() Members

func (*MemberSet) TopologyHash

func (ms *MemberSet) TopologyHash() uint64

func (*MemberSet) Union

func (ms *MemberSet) Union(other *MemberSet) *MemberSet

type MemberStateDelta

type MemberStateDelta struct {
	TargetMemberID string
	HasState       bool
	State          *GossipState
	CommitOffsets  func()
}

type MemberStatus

type MemberStatus struct {
	Member
	MemberID string // for compatibility
	Alive    bool
}

func (*MemberStatus) Address

func (m *MemberStatus) Address() string

type MemberStatusEvent

type MemberStatusEvent interface {
	MemberStatusEvent()
	GetKinds() []string
}

type MemberStrategy

type MemberStrategy interface {
	GetAllMembers() Members
	AddMember(member *Member)
	RemoveMember(member *Member)
	GetPartition(key string) string
	GetActivator(senderAddress string) string
}

type MemberUnavailableEvent

type MemberUnavailableEvent struct {
	MemberMeta
}

func (*MemberUnavailableEvent) MemberStatusEvent

func (*MemberUnavailableEvent) MemberStatusEvent()

type Members

type Members []*Member

func CopySortMembers

func CopySortMembers(members Members) Members

func (*Members) ToSet

func (m *Members) ToSet() *MemberSet

type NotifyAboutFailingSubscribersRequest

type NotifyAboutFailingSubscribersRequest struct {
	InvalidDeliveries []*SubscriberDeliveryReport `protobuf:"bytes,1,rep,name=invalid_deliveries,json=invalidDeliveries,proto3" json:"invalid_deliveries,omitempty"`
	// contains filtered or unexported fields
}

Message sent from delivery actor to topic to notify of subscribers that fail to process the messages

func (*NotifyAboutFailingSubscribersRequest) Descriptor deprecated

func (*NotifyAboutFailingSubscribersRequest) Descriptor() ([]byte, []int)

Deprecated: Use NotifyAboutFailingSubscribersRequest.ProtoReflect.Descriptor instead.

func (*NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries

func (*NotifyAboutFailingSubscribersRequest) ProtoMessage

func (*NotifyAboutFailingSubscribersRequest) ProtoMessage()

func (*NotifyAboutFailingSubscribersRequest) ProtoReflect

func (*NotifyAboutFailingSubscribersRequest) Reset

func (*NotifyAboutFailingSubscribersRequest) String

type NotifyAboutFailingSubscribersResponse

type NotifyAboutFailingSubscribersResponse struct {
	// contains filtered or unexported fields
}

Ack to the delivery actor after notification of subscribers that fail to process the messages

func (*NotifyAboutFailingSubscribersResponse) Descriptor deprecated

func (*NotifyAboutFailingSubscribersResponse) Descriptor() ([]byte, []int)

Deprecated: Use NotifyAboutFailingSubscribersResponse.ProtoReflect.Descriptor instead.

func (*NotifyAboutFailingSubscribersResponse) ProtoMessage

func (*NotifyAboutFailingSubscribersResponse) ProtoMessage()

func (*NotifyAboutFailingSubscribersResponse) ProtoReflect

func (*NotifyAboutFailingSubscribersResponse) Reset

func (*NotifyAboutFailingSubscribersResponse) String

type Option

type Option func(g *Gossiper)

type PackedActivations

type PackedActivations struct {
	Address string                    `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
	Actors  []*PackedActivations_Kind `protobuf:"bytes,2,rep,name=actors,proto3" json:"actors,omitempty"`
	// contains filtered or unexported fields
}

func (*PackedActivations) Descriptor deprecated

func (*PackedActivations) Descriptor() ([]byte, []int)

Deprecated: Use PackedActivations.ProtoReflect.Descriptor instead.

func (*PackedActivations) GetActors

func (x *PackedActivations) GetActors() []*PackedActivations_Kind

func (*PackedActivations) GetAddress

func (x *PackedActivations) GetAddress() string

func (*PackedActivations) ProtoMessage

func (*PackedActivations) ProtoMessage()

func (*PackedActivations) ProtoReflect

func (x *PackedActivations) ProtoReflect() protoreflect.Message

func (*PackedActivations) Reset

func (x *PackedActivations) Reset()

func (*PackedActivations) String

func (x *PackedActivations) String() string

type PackedActivations_Activation

type PackedActivations_Activation struct {
	Identity     string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"`
	ActivationId string `protobuf:"bytes,2,opt,name=activation_id,json=activationId,proto3" json:"activation_id,omitempty"`
	// contains filtered or unexported fields
}

func (*PackedActivations_Activation) Descriptor deprecated

func (*PackedActivations_Activation) Descriptor() ([]byte, []int)

Deprecated: Use PackedActivations_Activation.ProtoReflect.Descriptor instead.

func (*PackedActivations_Activation) GetActivationId

func (x *PackedActivations_Activation) GetActivationId() string

func (*PackedActivations_Activation) GetIdentity

func (x *PackedActivations_Activation) GetIdentity() string

func (*PackedActivations_Activation) ProtoMessage

func (*PackedActivations_Activation) ProtoMessage()

func (*PackedActivations_Activation) ProtoReflect

func (*PackedActivations_Activation) Reset

func (x *PackedActivations_Activation) Reset()

func (*PackedActivations_Activation) String

type PackedActivations_Kind

type PackedActivations_Kind struct {
	Name        string                          `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Activations []*PackedActivations_Activation `protobuf:"bytes,2,rep,name=activations,proto3" json:"activations,omitempty"`
	// contains filtered or unexported fields
}

func (*PackedActivations_Kind) Descriptor deprecated

func (*PackedActivations_Kind) Descriptor() ([]byte, []int)

Deprecated: Use PackedActivations_Kind.ProtoReflect.Descriptor instead.

func (*PackedActivations_Kind) GetActivations

func (*PackedActivations_Kind) GetName

func (x *PackedActivations_Kind) GetName() string

func (*PackedActivations_Kind) ProtoMessage

func (*PackedActivations_Kind) ProtoMessage()

func (*PackedActivations_Kind) ProtoReflect

func (x *PackedActivations_Kind) ProtoReflect() protoreflect.Message

func (*PackedActivations_Kind) Reset

func (x *PackedActivations_Kind) Reset()

func (*PackedActivations_Kind) String

func (x *PackedActivations_Kind) String() string

type PidCacheValue

type PidCacheValue struct {
	// contains filtered or unexported fields
}

func NewPidCache

func NewPidCache() *PidCacheValue

func (*PidCacheValue) Get

func (c *PidCacheValue) Get(identity string, kind string) (*actor.PID, bool)

func (*PidCacheValue) Remove

func (c *PidCacheValue) Remove(identity string, kind string)

func (*PidCacheValue) RemoveByMember

func (c *PidCacheValue) RemoveByMember(member *Member)

func (*PidCacheValue) RemoveByValue

func (c *PidCacheValue) RemoveByValue(identity string, kind string, pid *actor.PID)

func (*PidCacheValue) Set

func (c *PidCacheValue) Set(identity string, kind string, pid *actor.PID)

type PidResult

type PidResult struct {
	Pid *actor.PID
}

PidResult contains

type ProduceProcessInfo

type ProduceProcessInfo struct {
	Finished chan struct{}
	Err      error
	// contains filtered or unexported fields
}

ProduceProcessInfo is the context for a Produce call

func (*ProduceProcessInfo) IsCancelled

func (p *ProduceProcessInfo) IsCancelled() bool

IsCancelled returns true if the context has been cancelled

func (*ProduceProcessInfo) IsFinished

func (p *ProduceProcessInfo) IsFinished() bool

IsFinished returns true if the context has been finished

type ProducerQueueFullException

type ProducerQueueFullException struct {
	// contains filtered or unexported fields
}

func (*ProducerQueueFullException) Error

func (*ProducerQueueFullException) Is

func (p *ProducerQueueFullException) Is(target error) bool

type ProxyActivationRequest

type ProxyActivationRequest struct {
	ClusterIdentity    *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	ReplacedActivation *actor.PID       `protobuf:"bytes,2,opt,name=replaced_activation,json=replacedActivation,proto3" json:"replaced_activation,omitempty"`
	// contains filtered or unexported fields
}

func (*ProxyActivationRequest) Descriptor deprecated

func (*ProxyActivationRequest) Descriptor() ([]byte, []int)

Deprecated: Use ProxyActivationRequest.ProtoReflect.Descriptor instead.

func (*ProxyActivationRequest) GetClusterIdentity

func (x *ProxyActivationRequest) GetClusterIdentity() *ClusterIdentity

func (*ProxyActivationRequest) GetReplacedActivation

func (x *ProxyActivationRequest) GetReplacedActivation() *actor.PID

func (*ProxyActivationRequest) ProtoMessage

func (*ProxyActivationRequest) ProtoMessage()

func (*ProxyActivationRequest) ProtoReflect

func (x *ProxyActivationRequest) ProtoReflect() protoreflect.Message

func (*ProxyActivationRequest) Reset

func (x *ProxyActivationRequest) Reset()

func (*ProxyActivationRequest) String

func (x *ProxyActivationRequest) String() string

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

func GetPubSub

func GetPubSub(system *actor.ActorSystem) *PubSub

GetPubSub returns the PubSub extension from the actor system

func NewPubSub

func NewPubSub(cluster *Cluster) *PubSub

func (*PubSub) ExtensionID

func (p *PubSub) ExtensionID() extensions.ExtensionID

func (*PubSub) Start

func (p *PubSub) Start()

Start the PubSubMemberDeliveryActor

type PubSubAutoRespondBatch

type PubSubAutoRespondBatch struct {
	Envelopes []proto.Message
}

func (*PubSubAutoRespondBatch) GetAutoResponse

func (b *PubSubAutoRespondBatch) GetAutoResponse(_ actor.Context) interface{}

GetAutoResponse returns a PublishResponse.

func (*PubSubAutoRespondBatch) GetMessages

func (b *PubSubAutoRespondBatch) GetMessages() []interface{}

GetMessages returns the message.

func (*PubSubAutoRespondBatch) Serialize

Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.

type PubSubAutoRespondBatchTransport

type PubSubAutoRespondBatchTransport struct {
	TypeNames []string          `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"`
	Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"`
	// contains filtered or unexported fields
}

Message posted to subscriber's mailbox, that is then unrolled to single messages, and has ability to auto respond See also PubSubAutoRespondBatch

func (*PubSubAutoRespondBatchTransport) Descriptor deprecated

func (*PubSubAutoRespondBatchTransport) Descriptor() ([]byte, []int)

Deprecated: Use PubSubAutoRespondBatchTransport.ProtoReflect.Descriptor instead.

func (*PubSubAutoRespondBatchTransport) Deserialize

Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.

func (*PubSubAutoRespondBatchTransport) GetEnvelopes

func (x *PubSubAutoRespondBatchTransport) GetEnvelopes() []*PubSubEnvelope

func (*PubSubAutoRespondBatchTransport) GetTypeNames

func (x *PubSubAutoRespondBatchTransport) GetTypeNames() []string

func (*PubSubAutoRespondBatchTransport) ProtoMessage

func (*PubSubAutoRespondBatchTransport) ProtoMessage()

func (*PubSubAutoRespondBatchTransport) ProtoReflect

func (*PubSubAutoRespondBatchTransport) Reset

func (*PubSubAutoRespondBatchTransport) String

type PubSubBatch

type PubSubBatch struct {
	Envelopes []proto.Message
}

func (*PubSubBatch) Serialize

func (b *PubSubBatch) Serialize() (remote.RootSerialized, error)

Serialize converts a PubSubBatch to a PubSubBatchTransport.

type PubSubBatchTransport

type PubSubBatchTransport struct {
	TypeNames []string          `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"`
	Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"`
	// contains filtered or unexported fields
}

Message sent from publisher to topic actor See also PubSubBatch

func (*PubSubBatchTransport) Descriptor deprecated

func (*PubSubBatchTransport) Descriptor() ([]byte, []int)

Deprecated: Use PubSubBatchTransport.ProtoReflect.Descriptor instead.

func (*PubSubBatchTransport) Deserialize

func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error)

Deserialize converts a PubSubBatchTransport to a PubSubBatch.

func (*PubSubBatchTransport) GetEnvelopes

func (x *PubSubBatchTransport) GetEnvelopes() []*PubSubEnvelope

func (*PubSubBatchTransport) GetTypeNames

func (x *PubSubBatchTransport) GetTypeNames() []string

func (*PubSubBatchTransport) ProtoMessage

func (*PubSubBatchTransport) ProtoMessage()

func (*PubSubBatchTransport) ProtoReflect

func (x *PubSubBatchTransport) ProtoReflect() protoreflect.Message

func (*PubSubBatchTransport) Reset

func (x *PubSubBatchTransport) Reset()

func (*PubSubBatchTransport) String

func (x *PubSubBatchTransport) String() string

type PubSubConfig

type PubSubConfig struct {
	// SubscriberTimeout is a timeout used when delivering a message batch to a subscriber. Default is 5s.
	//
	// This value gets rounded to seconds for optimization of cancellation token creation. Note that internally,
	// cluster request is used to deliver messages to ClusterIdentity subscribers.
	SubscriberTimeout time.Duration
}

type PubSubEnvelope

type PubSubEnvelope struct {
	TypeId       int32  `protobuf:"varint,1,opt,name=type_id,json=typeId,proto3" json:"type_id,omitempty"`
	MessageData  []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	SerializerId int32  `protobuf:"varint,3,opt,name=serializer_id,json=serializerId,proto3" json:"serializer_id,omitempty"`
	// contains filtered or unexported fields
}

Contains message byte representation and type reference

func (*PubSubEnvelope) Descriptor deprecated

func (*PubSubEnvelope) Descriptor() ([]byte, []int)

Deprecated: Use PubSubEnvelope.ProtoReflect.Descriptor instead.

func (*PubSubEnvelope) GetMessageData

func (x *PubSubEnvelope) GetMessageData() []byte

func (*PubSubEnvelope) GetSerializerId

func (x *PubSubEnvelope) GetSerializerId() int32

func (*PubSubEnvelope) GetTypeId

func (x *PubSubEnvelope) GetTypeId() int32

func (*PubSubEnvelope) ProtoMessage

func (*PubSubEnvelope) ProtoMessage()

func (*PubSubEnvelope) ProtoReflect

func (x *PubSubEnvelope) ProtoReflect() protoreflect.Message

func (*PubSubEnvelope) Reset

func (x *PubSubEnvelope) Reset()

func (*PubSubEnvelope) String

func (x *PubSubEnvelope) String() string

type PubSubMemberDeliveryActor

type PubSubMemberDeliveryActor struct {
	// contains filtered or unexported fields
}

func NewPubSubMemberDeliveryActor

func NewPubSubMemberDeliveryActor(subscriberTimeout time.Duration, logger *slog.Logger) *PubSubMemberDeliveryActor

func (*PubSubMemberDeliveryActor) DeliverBatch

DeliverBatch delivers PubSubAutoRespondBatch to SubscriberIdentity.

func (*PubSubMemberDeliveryActor) DeliverToClusterIdentity

func (p *PubSubMemberDeliveryActor) DeliverToClusterIdentity(c actor.Context, batch *PubSubAutoRespondBatch, ci *ClusterIdentity) *actor.Future

DeliverToClusterIdentity delivers PubSubAutoRespondBatch to ClusterIdentity.

func (*PubSubMemberDeliveryActor) DeliverToPid

DeliverToPid delivers PubSubAutoRespondBatch to PID.

func (*PubSubMemberDeliveryActor) Receive

func (p *PubSubMemberDeliveryActor) Receive(c actor.Context)

type PublishResponse

type PublishResponse struct {

	// Status of the whole published batch or single message
	Status PublishStatus `protobuf:"varint,1,opt,name=status,proto3,enum=cluster.PublishStatus" json:"status,omitempty"`
	// contains filtered or unexported fields
}

Publish ack/nack response

func (*PublishResponse) Descriptor deprecated

func (*PublishResponse) Descriptor() ([]byte, []int)

Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead.

func (*PublishResponse) GetStatus

func (x *PublishResponse) GetStatus() PublishStatus

func (*PublishResponse) ProtoMessage

func (*PublishResponse) ProtoMessage()

func (*PublishResponse) ProtoReflect

func (x *PublishResponse) ProtoReflect() protoreflect.Message

func (*PublishResponse) Reset

func (x *PublishResponse) Reset()

func (*PublishResponse) String

func (x *PublishResponse) String() string

type PublishStatus

type PublishStatus int32

Status of the whole published batch or single message

const (
	// Batch or message was successfully published according to the delivery guarantees
	PublishStatus_Ok PublishStatus = 0
	// Topic failed to forward the message
	PublishStatus_Failed PublishStatus = 1
)

func (PublishStatus) Descriptor

func (PublishStatus) Enum

func (x PublishStatus) Enum() *PublishStatus

func (PublishStatus) EnumDescriptor deprecated

func (PublishStatus) EnumDescriptor() ([]byte, []int)

Deprecated: Use PublishStatus.Descriptor instead.

func (PublishStatus) Number

func (PublishStatus) String

func (x PublishStatus) String() string

func (PublishStatus) Type

type Publisher

type Publisher interface {
	// Initialize the internal mechanisms of this publisher.
	Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error)

	// PublishBatch publishes a batch of messages to the topic.
	PublishBatch(ctx context.Context, topic string, batch *PubSubBatch, opts ...GrainCallOption) (*PublishResponse, error)

	// Publish publishes a single message to the topic.
	Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error)

	Logger() *slog.Logger
}

func NewPublisher

func NewPublisher(cluster *Cluster) Publisher

type PublisherConfig

type PublisherConfig struct {
	IdleTimeout time.Duration
}

type PublishingErrorDecision

type PublishingErrorDecision struct {
	Delay time.Duration
}

func NewPublishingErrorDecision

func NewPublishingErrorDecision(delay time.Duration) *PublishingErrorDecision

NewPublishingErrorDecision creates a new PublishingErrorDecision

func RetryBatchAfter

func RetryBatchAfter(delay time.Duration) *PublishingErrorDecision

RetryBatchAfter returns a new PublishingErrorDecision with the Delay set to the given duration

type PublishingErrorHandler

type PublishingErrorHandler func(retries int, e error, batch *PubSubBatch) *PublishingErrorDecision

PublishingErrorHandler decides what to do with a publishing error in BatchingProducer

type ReadyForRebalance

type ReadyForRebalance struct {
	TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*ReadyForRebalance) Descriptor deprecated

func (*ReadyForRebalance) Descriptor() ([]byte, []int)

Deprecated: Use ReadyForRebalance.ProtoReflect.Descriptor instead.

func (*ReadyForRebalance) GetTopologyHash

func (x *ReadyForRebalance) GetTopologyHash() uint64

func (*ReadyForRebalance) ProtoMessage

func (*ReadyForRebalance) ProtoMessage()

func (*ReadyForRebalance) ProtoReflect

func (x *ReadyForRebalance) ProtoReflect() protoreflect.Message

func (*ReadyForRebalance) Reset

func (x *ReadyForRebalance) Reset()

func (*ReadyForRebalance) String

func (x *ReadyForRebalance) String() string

type RebalanceCompleted

type RebalanceCompleted struct {
	TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*RebalanceCompleted) Descriptor deprecated

func (*RebalanceCompleted) Descriptor() ([]byte, []int)

Deprecated: Use RebalanceCompleted.ProtoReflect.Descriptor instead.

func (*RebalanceCompleted) GetTopologyHash

func (x *RebalanceCompleted) GetTopologyHash() uint64

func (*RebalanceCompleted) ProtoMessage

func (*RebalanceCompleted) ProtoMessage()

func (*RebalanceCompleted) ProtoReflect

func (x *RebalanceCompleted) ProtoReflect() protoreflect.Message

func (*RebalanceCompleted) Reset

func (x *RebalanceCompleted) Reset()

func (*RebalanceCompleted) String

func (x *RebalanceCompleted) String() string

type RemoteIdentityHandover

type RemoteIdentityHandover struct {
	Actors       *PackedActivations `protobuf:"bytes,1,opt,name=actors,proto3" json:"actors,omitempty"`
	ChunkId      int32              `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"`
	Final        bool               `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"`
	TopologyHash uint64             `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Skipped      int32              `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"`
	Sent         int32              `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"`
	// contains filtered or unexported fields
}

func (*RemoteIdentityHandover) Descriptor deprecated

func (*RemoteIdentityHandover) Descriptor() ([]byte, []int)

Deprecated: Use RemoteIdentityHandover.ProtoReflect.Descriptor instead.

func (*RemoteIdentityHandover) GetActors

func (x *RemoteIdentityHandover) GetActors() *PackedActivations

func (*RemoteIdentityHandover) GetChunkId

func (x *RemoteIdentityHandover) GetChunkId() int32

func (*RemoteIdentityHandover) GetFinal

func (x *RemoteIdentityHandover) GetFinal() bool

func (*RemoteIdentityHandover) GetSent

func (x *RemoteIdentityHandover) GetSent() int32

func (*RemoteIdentityHandover) GetSkipped

func (x *RemoteIdentityHandover) GetSkipped() int32

func (*RemoteIdentityHandover) GetTopologyHash

func (x *RemoteIdentityHandover) GetTopologyHash() uint64

func (*RemoteIdentityHandover) ProtoMessage

func (*RemoteIdentityHandover) ProtoMessage()

func (*RemoteIdentityHandover) ProtoReflect

func (x *RemoteIdentityHandover) ProtoReflect() protoreflect.Message

func (*RemoteIdentityHandover) Reset

func (x *RemoteIdentityHandover) Reset()

func (*RemoteIdentityHandover) String

func (x *RemoteIdentityHandover) String() string

type RemoveConsensusCheck

type RemoveConsensusCheck struct {
	ID string
}

Mimic .NET ReenterAfterCancellation on GossipActor

func NewRemoveConsensusCheck

func NewRemoveConsensusCheck(id string) RemoveConsensusCheck

type Rendezvous

type Rendezvous struct {
	// contains filtered or unexported fields
}

func NewRendezvous

func NewRendezvous() *Rendezvous

func (*Rendezvous) GetByClusterIdentity

func (r *Rendezvous) GetByClusterIdentity(ci *ClusterIdentity) string

func (*Rendezvous) GetByIdentity

func (r *Rendezvous) GetByIdentity(identity string) string

func (*Rendezvous) UpdateMembers

func (r *Rendezvous) UpdateMembers(members Members)

type SendGossipStateRequest

type SendGossipStateRequest struct{}

type SendGossipStateResponse

type SendGossipStateResponse struct{}

type SetGossipStateKey

type SetGossipStateKey struct {
	Key   string
	Value proto.Message
}

Used to setup Gossip Status Keys in the GossipActor

func NewGossipStateKey

func NewGossipStateKey(key string, value proto.Message) SetGossipStateKey

Create a new SetGossipStateKey value with the given data and return it back

type SetGossipStateResponse

type SetGossipStateResponse struct{}

Used by the GossipActor to respond SetGossipStatus requests

type SimpleRoundRobin

type SimpleRoundRobin struct {
	// contains filtered or unexported fields
}

func NewSimpleRoundRobin

func NewSimpleRoundRobin(memberStrategy MemberStrategy) *SimpleRoundRobin

func (*SimpleRoundRobin) GetByRoundRobin

func (r *SimpleRoundRobin) GetByRoundRobin() string

type SpawnLock

type SpawnLock struct {
	LockID          string
	ClusterIdentity *ClusterIdentity
}

SpawnLock contains

type StorageLookup

type StorageLookup interface {
	TryGetExistingActivation(clusterIdentity *ClusterIdentity) *StoredActivation

	TryAcquireLock(clusterIdentity *ClusterIdentity) *SpawnLock

	WaitForActivation(clusterIdentity *ClusterIdentity) *StoredActivation

	RemoveLock(spawnLock SpawnLock)

	StoreActivation(memberID string, spawnLock *SpawnLock, pid *actor.PID)

	RemoveActivation(pid *SpawnLock)

	RemoveMemberId(memberID string)
}

StorageLookup contains

type StoredActivation

type StoredActivation struct {
	Pid      string
	MemberID string
}

StoredActivation contains

type SubscribeRequest

type SubscribeRequest struct {
	Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"`
	// contains filtered or unexported fields
}

Sent to topic actor to add a subscriber

func (*SubscribeRequest) Descriptor deprecated

func (*SubscribeRequest) Descriptor() ([]byte, []int)

Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead.

func (*SubscribeRequest) GetSubscriber

func (x *SubscribeRequest) GetSubscriber() *SubscriberIdentity

func (*SubscribeRequest) ProtoMessage

func (*SubscribeRequest) ProtoMessage()

func (*SubscribeRequest) ProtoReflect

func (x *SubscribeRequest) ProtoReflect() protoreflect.Message

func (*SubscribeRequest) Reset

func (x *SubscribeRequest) Reset()

func (*SubscribeRequest) String

func (x *SubscribeRequest) String() string

type SubscribeResponse

type SubscribeResponse struct {
	// contains filtered or unexported fields
}

Subscribe acknowledgement

func (*SubscribeResponse) Descriptor deprecated

func (*SubscribeResponse) Descriptor() ([]byte, []int)

Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead.

func (*SubscribeResponse) ProtoMessage

func (*SubscribeResponse) ProtoMessage()

func (*SubscribeResponse) ProtoReflect

func (x *SubscribeResponse) ProtoReflect() protoreflect.Message

func (*SubscribeResponse) Reset

func (x *SubscribeResponse) Reset()

func (*SubscribeResponse) String

func (x *SubscribeResponse) String() string

type SubscriberDeliveryReport

type SubscriberDeliveryReport struct {
	Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"`
	Status     DeliveryStatus      `protobuf:"varint,2,opt,name=status,proto3,enum=cluster.DeliveryStatus" json:"status,omitempty"`
	// contains filtered or unexported fields
}

Contains information about a failed delivery

func (*SubscriberDeliveryReport) Descriptor deprecated

func (*SubscriberDeliveryReport) Descriptor() ([]byte, []int)

Deprecated: Use SubscriberDeliveryReport.ProtoReflect.Descriptor instead.

func (*SubscriberDeliveryReport) GetStatus

func (*SubscriberDeliveryReport) GetSubscriber

func (x *SubscriberDeliveryReport) GetSubscriber() *SubscriberIdentity

func (*SubscriberDeliveryReport) ProtoMessage

func (*SubscriberDeliveryReport) ProtoMessage()

func (*SubscriberDeliveryReport) ProtoReflect

func (x *SubscriberDeliveryReport) ProtoReflect() protoreflect.Message

func (*SubscriberDeliveryReport) Reset

func (x *SubscriberDeliveryReport) Reset()

func (*SubscriberDeliveryReport) String

func (x *SubscriberDeliveryReport) String() string

type SubscriberIdentity

type SubscriberIdentity struct {

	// Types that are assignable to Identity:
	//	*SubscriberIdentity_Pid
	//	*SubscriberIdentity_ClusterIdentity
	Identity isSubscriberIdentity_Identity `protobuf_oneof:"Identity"`
	// contains filtered or unexported fields
}

Identifies a subscriber by either ClusterIdentity or PID

func (*SubscriberIdentity) Descriptor deprecated

func (*SubscriberIdentity) Descriptor() ([]byte, []int)

Deprecated: Use SubscriberIdentity.ProtoReflect.Descriptor instead.

func (*SubscriberIdentity) GetClusterIdentity

func (x *SubscriberIdentity) GetClusterIdentity() *ClusterIdentity

func (*SubscriberIdentity) GetIdentity

func (m *SubscriberIdentity) GetIdentity() isSubscriberIdentity_Identity

func (*SubscriberIdentity) GetPid

func (x *SubscriberIdentity) GetPid() *actor.PID

func (*SubscriberIdentity) ProtoMessage

func (*SubscriberIdentity) ProtoMessage()

func (*SubscriberIdentity) ProtoReflect

func (x *SubscriberIdentity) ProtoReflect() protoreflect.Message

func (*SubscriberIdentity) Reset

func (x *SubscriberIdentity) Reset()

func (*SubscriberIdentity) String

func (x *SubscriberIdentity) String() string

type SubscriberIdentity_ClusterIdentity

type SubscriberIdentity_ClusterIdentity struct {
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3,oneof"`
}

type SubscriberIdentity_Pid

type SubscriberIdentity_Pid struct {
	Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3,oneof"`
}

type Subscribers

type Subscribers struct {
	Subscribers []*SubscriberIdentity `protobuf:"bytes,1,rep,name=subscribers,proto3" json:"subscribers,omitempty"`
	// contains filtered or unexported fields
}

A list of subscribers

func (*Subscribers) Descriptor deprecated

func (*Subscribers) Descriptor() ([]byte, []int)

Deprecated: Use Subscribers.ProtoReflect.Descriptor instead.

func (*Subscribers) GetSubscribers

func (x *Subscribers) GetSubscribers() []*SubscriberIdentity

func (*Subscribers) ProtoMessage

func (*Subscribers) ProtoMessage()

func (*Subscribers) ProtoReflect

func (x *Subscribers) ProtoReflect() protoreflect.Message

func (*Subscribers) Reset

func (x *Subscribers) Reset()

func (*Subscribers) String

func (x *Subscribers) String() string

type TestMessage

type TestMessage struct {
	Number int32 `protobuf:"varint,1,opt,name=number,proto3" json:"number,omitempty"`
	// contains filtered or unexported fields
}

func (*TestMessage) Descriptor deprecated

func (*TestMessage) Descriptor() ([]byte, []int)

Deprecated: Use TestMessage.ProtoReflect.Descriptor instead.

func (*TestMessage) GetNumber

func (x *TestMessage) GetNumber() int32

func (*TestMessage) ProtoMessage

func (*TestMessage) ProtoMessage()

func (*TestMessage) ProtoReflect

func (x *TestMessage) ProtoReflect() protoreflect.Message

func (*TestMessage) Reset

func (x *TestMessage) Reset()

func (*TestMessage) String

func (x *TestMessage) String() string

type TopicActor

type TopicActor struct {
	// contains filtered or unexported fields
}

func NewTopicActor

func NewTopicActor(store KeyValueStore[*Subscribers], logger *slog.Logger) *TopicActor

func (*TopicActor) Receive

func (t *TopicActor) Receive(c actor.Context)

type UnsubscribeRequest

type UnsubscribeRequest struct {
	Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"`
	// contains filtered or unexported fields
}

Sent to topic actor to remove a subscriber

func (*UnsubscribeRequest) Descriptor deprecated

func (*UnsubscribeRequest) Descriptor() ([]byte, []int)

Deprecated: Use UnsubscribeRequest.ProtoReflect.Descriptor instead.

func (*UnsubscribeRequest) GetSubscriber

func (x *UnsubscribeRequest) GetSubscriber() *SubscriberIdentity

func (*UnsubscribeRequest) ProtoMessage

func (*UnsubscribeRequest) ProtoMessage()

func (*UnsubscribeRequest) ProtoReflect

func (x *UnsubscribeRequest) ProtoReflect() protoreflect.Message

func (*UnsubscribeRequest) Reset

func (x *UnsubscribeRequest) Reset()

func (*UnsubscribeRequest) String

func (x *UnsubscribeRequest) String() string

type UnsubscribeResponse

type UnsubscribeResponse struct {
	// contains filtered or unexported fields
}

Unsubscribe acknowledgement

func (*UnsubscribeResponse) Descriptor deprecated

func (*UnsubscribeResponse) Descriptor() ([]byte, []int)

Deprecated: Use UnsubscribeResponse.ProtoReflect.Descriptor instead.

func (*UnsubscribeResponse) ProtoMessage

func (*UnsubscribeResponse) ProtoMessage()

func (*UnsubscribeResponse) ProtoReflect

func (x *UnsubscribeResponse) ProtoReflect() protoreflect.Message

func (*UnsubscribeResponse) Reset

func (x *UnsubscribeResponse) Reset()

func (*UnsubscribeResponse) String

func (x *UnsubscribeResponse) String() string

Directories

Path Synopsis
clusterproviders
k8s
zk
identitylookup

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL