Documentation ¶
Index ¶
- Constants
- Variables
- func MembersToMap(members Members) map[string]*Member
- func NewGossipConsensusHandler() *gossipConsensusHandler
- func Reason(err error) string
- func RemotePlacementActor(address string) *actor.PID
- func SetClusterIdentity(ctx actor.ExtensionContext, ci *ClusterIdentity)
- func SortMembers(members Members)
- func TopologyHash(members Members) uint64
- func WithClusterIdentity(props *actor.Props, ci *ClusterIdentity) *actor.Props
- type Acknowledge
- type ActivatedKind
- type Activation
- func (*Activation) Descriptor() ([]byte, []int)deprecated
- func (x *Activation) GetClusterIdentity() *ClusterIdentity
- func (x *Activation) GetPid() *actor.PID
- func (*Activation) ProtoMessage()
- func (x *Activation) ProtoReflect() protoreflect.Message
- func (x *Activation) Reset()
- func (x *Activation) String() string
- type ActivationRequest
- func (*ActivationRequest) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationRequest) GetClusterIdentity() *ClusterIdentity
- func (x *ActivationRequest) GetRequestId() string
- func (x *ActivationRequest) GetTopologyHash() uint64
- func (*ActivationRequest) ProtoMessage()
- func (x *ActivationRequest) ProtoReflect() protoreflect.Message
- func (x *ActivationRequest) Reset()
- func (x *ActivationRequest) String() string
- type ActivationResponse
- func (*ActivationResponse) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationResponse) GetFailed() bool
- func (x *ActivationResponse) GetPid() *actor.PID
- func (x *ActivationResponse) GetTopologyHash() uint64
- func (*ActivationResponse) ProtoMessage()
- func (x *ActivationResponse) ProtoReflect() protoreflect.Message
- func (x *ActivationResponse) Reset()
- func (x *ActivationResponse) String() string
- type ActivationTerminated
- func (*ActivationTerminated) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationTerminated) GetClusterIdentity() *ClusterIdentity
- func (x *ActivationTerminated) GetPid() *actor.PID
- func (*ActivationTerminated) ProtoMessage()
- func (x *ActivationTerminated) ProtoReflect() protoreflect.Message
- func (x *ActivationTerminated) Reset()
- func (x *ActivationTerminated) String() string
- type ActivationTerminating
- func (*ActivationTerminating) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationTerminating) GetClusterIdentity() *ClusterIdentity
- func (x *ActivationTerminating) GetPid() *actor.PID
- func (*ActivationTerminating) ProtoMessage()
- func (x *ActivationTerminating) ProtoReflect() protoreflect.Message
- func (x *ActivationTerminating) Reset()
- func (x *ActivationTerminating) String() string
- type ActorStatistics
- type AddConsensusCheck
- type BatchingProducer
- type BatchingProducerConfig
- type BatchingProducerConfigOption
- func WithBatchingProducerBatchSize(batchSize int) BatchingProducerConfigOption
- func WithBatchingProducerLogThrottle(logThrottle actor.ShouldThrottle) BatchingProducerConfigOption
- func WithBatchingProducerMaxQueueSize(maxQueueSize int) BatchingProducerConfigOption
- func WithBatchingProducerOnPublishingError(onPublishingError PublishingErrorHandler) BatchingProducerConfigOption
- func WithBatchingProducerPublishTimeout(publishTimeout time.Duration) BatchingProducerConfigOption
- func WithBatchingProducerPublisherIdleTimeout(publisherIdleTimeout time.Duration) BatchingProducerConfigOption
- type Cluster
- func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigOption) *BatchingProducer
- func (c *Cluster) ExtensionID() extensions.ExtensionID
- func (c *Cluster) Get(identity string, kind string) *actor.PID
- func (c *Cluster) GetBlockedMembers() set.Set[string]
- func (c *Cluster) GetClusterKind(kind string) *ActivatedKind
- func (c *Cluster) GetClusterKinds() []string
- func (c *Cluster) Logger() *slog.Logger
- func (c *Cluster) Publisher() Publisher
- func (c *Cluster) Request(identity string, kind string, message interface{}, option ...GrainCallOption) (interface{}, error)
- func (c *Cluster) RequestFuture(identity string, kind string, message interface{}, option ...GrainCallOption) (*actor.Future, error)
- func (c *Cluster) Shutdown(graceful bool)
- func (c *Cluster) StartClient()
- func (c *Cluster) StartMember()
- func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error)
- func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error)
- func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, opts ...GrainCallOption) (*SubscribeResponse, error)
- func (c *Cluster) TryGetClusterKind(kind string) (*ActivatedKind, bool)
- func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error)
- func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error)
- func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error)
- type ClusterContextConfig
- type ClusterIdentity
- func (ci *ClusterIdentity) AsKey() string
- func (*ClusterIdentity) Descriptor() ([]byte, []int)deprecated
- func (ci *ClusterIdentity) ExtensionID() ctxext.ContextExtensionID
- func (x *ClusterIdentity) GetIdentity() string
- func (x *ClusterIdentity) GetKind() string
- func (*ClusterIdentity) ProtoMessage()
- func (x *ClusterIdentity) ProtoReflect() protoreflect.Message
- func (x *ClusterIdentity) Reset()
- func (x *ClusterIdentity) String() string
- func (ci *ClusterIdentity) ToShortString() string
- type ClusterInit
- type ClusterProvider
- type ClusterTopology
- func (*ClusterTopology) Descriptor() ([]byte, []int)deprecated
- func (x *ClusterTopology) GetBlocked() []string
- func (x *ClusterTopology) GetJoined() []*Member
- func (x *ClusterTopology) GetLeft() []*Member
- func (x *ClusterTopology) GetMembers() []*Member
- func (x *ClusterTopology) GetTopologyHash() uint64
- func (*ClusterTopology) ProtoMessage()
- func (x *ClusterTopology) ProtoReflect() protoreflect.Message
- func (x *ClusterTopology) Reset()
- func (x *ClusterTopology) String() string
- type ClusterTopologyNotification
- func (*ClusterTopologyNotification) Descriptor() ([]byte, []int)deprecated
- func (x *ClusterTopologyNotification) GetLeaderId() string
- func (x *ClusterTopologyNotification) GetMemberId() string
- func (x *ClusterTopologyNotification) GetTopologyHash() uint32
- func (*ClusterTopologyNotification) ProtoMessage()
- func (x *ClusterTopologyNotification) ProtoReflect() protoreflect.Message
- func (x *ClusterTopologyNotification) Reset()
- func (x *ClusterTopologyNotification) String() string
- type Config
- type ConfigOption
- func WithClusterContextProducer(producer ContextProducer) ConfigOption
- func WithHeartbeatExpiration(t time.Duration) ConfigOption
- func WithKinds(kinds ...*Kind) ConfigOption
- func WithMaxNumberOfEventsInRequestLogThrottlePeriod(maxNumber int) ConfigOption
- func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption
- func WithRequestLog(enabled bool) ConfigOption
- func WithRequestTimeout(t time.Duration) ConfigOption
- func WithRequestsLogThrottlePeriod(period time.Duration) ConfigOption
- type ConsensusCheck
- type ConsensusCheckBuilder
- func (ccb *ConsensusCheckBuilder) AffectedKeys() []string
- func (ccb *ConsensusCheckBuilder) Build() (ConsensusHandler, *ConsensusCheck)
- func (ccb *ConsensusCheckBuilder) Check() ConsensusChecker
- func (ccb *ConsensusCheckBuilder) HasConsensus(memberValues []*consensusMemberValue) (bool, uint64)
- func (ccb *ConsensusCheckBuilder) MapToValue(valueTuple *consensusValue) func(string, *GossipMemberState) (string, string, uint64)
- type ConsensusCheckDefinition
- type ConsensusChecker
- type ConsensusChecks
- type ConsensusHandler
- type Context
- type ContextProducer
- type DefaultContext
- type DeliverBatchRequest
- type DeliverBatchRequestTransport
- func (*DeliverBatchRequestTransport) Descriptor() ([]byte, []int)deprecated
- func (t *DeliverBatchRequestTransport) Deserialize() (remote.RootSerializable, error)
- func (x *DeliverBatchRequestTransport) GetBatch() *PubSubBatchTransport
- func (x *DeliverBatchRequestTransport) GetSubscribers() *Subscribers
- func (x *DeliverBatchRequestTransport) GetTopic() string
- func (*DeliverBatchRequestTransport) ProtoMessage()
- func (x *DeliverBatchRequestTransport) ProtoReflect() protoreflect.Message
- func (x *DeliverBatchRequestTransport) Reset()
- func (x *DeliverBatchRequestTransport) String() string
- type DeliveryStatus
- func (DeliveryStatus) Descriptor() protoreflect.EnumDescriptor
- func (x DeliveryStatus) Enum() *DeliveryStatus
- func (DeliveryStatus) EnumDescriptor() ([]byte, []int)deprecated
- func (x DeliveryStatus) Number() protoreflect.EnumNumber
- func (x DeliveryStatus) String() string
- func (DeliveryStatus) Type() protoreflect.EnumType
- type EmptyKeyValueStore
- type GetGossipMapKeysRequest
- type GetGossipMapKeysResponse
- type GetGossipMapStateRequest
- type GetGossipMapStateResponse
- type GetGossipStateRequest
- type GetGossipStateResponse
- type GetPid
- type Gossip
- type GossipActor
- type GossipConsensusChecker
- type GossipCore
- type GossipKeyValue
- func (*GossipKeyValue) Descriptor() ([]byte, []int)deprecated
- func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64
- func (x *GossipKeyValue) GetSequenceNumber() int64
- func (x *GossipKeyValue) GetValue() *anypb.Any
- func (*GossipKeyValue) ProtoMessage()
- func (x *GossipKeyValue) ProtoReflect() protoreflect.Message
- func (x *GossipKeyValue) Reset()
- func (x *GossipKeyValue) String() string
- type GossipKeyValues
- type GossipMap
- type GossipMemberState
- func (*GossipMemberState) Descriptor() ([]byte, []int)deprecated
- func (x *GossipMemberState) GetValues() map[string]*GossipKeyValue
- func (*GossipMemberState) ProtoMessage()
- func (x *GossipMemberState) ProtoReflect() protoreflect.Message
- func (x *GossipMemberState) Reset()
- func (x *GossipMemberState) String() string
- type GossipMemberStates
- type GossipRequest
- func (*GossipRequest) Descriptor() ([]byte, []int)deprecated
- func (x *GossipRequest) GetFromMemberId() string
- func (x *GossipRequest) GetState() *GossipState
- func (*GossipRequest) ProtoMessage()
- func (x *GossipRequest) ProtoReflect() protoreflect.Message
- func (x *GossipRequest) Reset()
- func (x *GossipRequest) String() string
- type GossipResponse
- type GossipState
- type GossipStateStorer
- type GossipUpdate
- type GossipUpdater
- type Gossiper
- func (g *Gossiper) GetActorCount() map[string]int64
- func (g *Gossiper) GetMapKeys(gossipStateKey string) []string
- func (g *Gossiper) GetMapState(gossipStateKey string, mapKey string) *anypb.Any
- func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error)
- func (g *Gossiper) RegisterConsensusCheck(key string, getValue func(*anypb.Any) interface{}) ConsensusHandler
- func (g *Gossiper) RemoveMapState(gossipStateKey string, mapKey string)
- func (g *Gossiper) SendState()
- func (g *Gossiper) SetMapState(gossipStateKey string, mapKey string, value proto.Message)
- func (g *Gossiper) SetState(gossipStateKey string, value proto.Message)
- func (g *Gossiper) SetStateRequest(key string, value proto.Message) error
- func (g *Gossiper) Shutdown()
- func (g *Gossiper) StartGossiping() error
- type GrainCallConfig
- type GrainCallOption
- type GrainContext
- type GrainErrorResponse
- func (*GrainErrorResponse) Descriptor() ([]byte, []int)deprecated
- func (m *GrainErrorResponse) Error() string
- func (m *GrainErrorResponse) Errorf(format string, args ...interface{}) error
- func (x *GrainErrorResponse) GetMessage() string
- func (x *GrainErrorResponse) GetMetadata() map[string]string
- func (x *GrainErrorResponse) GetReason() string
- func (m *GrainErrorResponse) Is(err error) bool
- func (*GrainErrorResponse) ProtoMessage()
- func (x *GrainErrorResponse) ProtoReflect() protoreflect.Message
- func (x *GrainErrorResponse) Reset()
- func (x *GrainErrorResponse) String() string
- func (m *GrainErrorResponse) WithMetadata(metadata map[string]string) *GrainErrorResponse
- type GrainRequest
- func (*GrainRequest) Descriptor() ([]byte, []int)deprecated
- func (x *GrainRequest) GetMessageData() []byte
- func (x *GrainRequest) GetMessageTypeName() string
- func (x *GrainRequest) GetMethodIndex() int32
- func (*GrainRequest) ProtoMessage()
- func (x *GrainRequest) ProtoReflect() protoreflect.Message
- func (x *GrainRequest) Reset()
- func (x *GrainRequest) String() string
- type GrainResponse
- func (*GrainResponse) Descriptor() ([]byte, []int)deprecated
- func (x *GrainResponse) GetMessageData() []byte
- func (x *GrainResponse) GetMessageTypeName() string
- func (*GrainResponse) ProtoMessage()
- func (x *GrainResponse) ProtoReflect() protoreflect.Message
- func (x *GrainResponse) Reset()
- func (x *GrainResponse) String() string
- type IdentityHandover
- func (*IdentityHandover) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandover) GetActors() []*Activation
- func (x *IdentityHandover) GetChunkId() int32
- func (x *IdentityHandover) GetFinal() bool
- func (x *IdentityHandover) GetSent() int32
- func (x *IdentityHandover) GetSkipped() int32
- func (x *IdentityHandover) GetTopologyHash() uint64
- func (*IdentityHandover) ProtoMessage()
- func (x *IdentityHandover) ProtoReflect() protoreflect.Message
- func (x *IdentityHandover) Reset()
- func (x *IdentityHandover) String() string
- type IdentityHandoverAck
- func (*IdentityHandoverAck) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandoverAck) GetChunkId() int32
- func (x *IdentityHandoverAck) GetProcessingState() IdentityHandoverAck_State
- func (x *IdentityHandoverAck) GetTopologyHash() uint64
- func (*IdentityHandoverAck) ProtoMessage()
- func (x *IdentityHandoverAck) ProtoReflect() protoreflect.Message
- func (x *IdentityHandoverAck) Reset()
- func (x *IdentityHandoverAck) String() string
- type IdentityHandoverAck_State
- func (IdentityHandoverAck_State) Descriptor() protoreflect.EnumDescriptor
- func (x IdentityHandoverAck_State) Enum() *IdentityHandoverAck_State
- func (IdentityHandoverAck_State) EnumDescriptor() ([]byte, []int)deprecated
- func (x IdentityHandoverAck_State) Number() protoreflect.EnumNumber
- func (x IdentityHandoverAck_State) String() string
- func (IdentityHandoverAck_State) Type() protoreflect.EnumType
- type IdentityHandoverRequest
- func (*IdentityHandoverRequest) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandoverRequest) GetAddress() string
- func (x *IdentityHandoverRequest) GetCurrentTopology() *IdentityHandoverRequest_Topology
- func (x *IdentityHandoverRequest) GetDeltaTopology() *IdentityHandoverRequest_Topology
- func (*IdentityHandoverRequest) ProtoMessage()
- func (x *IdentityHandoverRequest) ProtoReflect() protoreflect.Message
- func (x *IdentityHandoverRequest) Reset()
- func (x *IdentityHandoverRequest) String() string
- type IdentityHandoverRequest_Topology
- func (*IdentityHandoverRequest_Topology) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandoverRequest_Topology) GetMembers() []*Member
- func (x *IdentityHandoverRequest_Topology) GetTopologyHash() uint64
- func (*IdentityHandoverRequest_Topology) ProtoMessage()
- func (x *IdentityHandoverRequest_Topology) ProtoReflect() protoreflect.Message
- func (x *IdentityHandoverRequest_Topology) Reset()
- func (x *IdentityHandoverRequest_Topology) String() string
- type IdentityLookup
- type IdentityStorageLookup
- type IdentityStorageWorker
- type Informer
- func (inf *Informer) AddConsensusCheck(id string, check *ConsensusCheck)
- func (inf *Informer) CheckConsensus(updatedKeys ...string)
- func (inf *Informer) GetMapKeys(stateKey string) []string
- func (inf *Informer) GetMapState(stateKey string, mapKey string) *anypb.Any
- func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelta
- func (inf *Informer) GetState(key string) map[string]*GossipKeyValue
- func (inf *Informer) ReceiveState(remoteState *GossipState) []*GossipUpdate
- func (inf *Informer) RemoveConsensusCheck(id string)
- func (inf *Informer) RemoveMapState(stateKey string, mapKey string)
- func (inf *Informer) SendState(sendStateToMember LocalStateSender)
- func (inf *Informer) SetMapState(stateKey string, mapKey string, value proto.Message)
- func (inf *Informer) SetState(key string, message proto.Message)
- func (inf *Informer) UpdateClusterTopology(topology *ClusterTopology)
- type Initialize
- type InvalidOperationException
- type KeyValueStore
- type Kind
- type LocalStateSender
- type Member
- func (m *Member) Address() string
- func (*Member) Descriptor() ([]byte, []int)deprecated
- func (x *Member) GetHost() string
- func (x *Member) GetId() string
- func (x *Member) GetKinds() []string
- func (x *Member) GetPort() int32
- func (m *Member) HasKind(kind string) bool
- func (*Member) ProtoMessage()
- func (x *Member) ProtoReflect() protoreflect.Message
- func (x *Member) Reset()
- func (x *Member) String() string
- type MemberAvailableEvent
- type MemberHeartbeat
- func (*MemberHeartbeat) Descriptor() ([]byte, []int)deprecated
- func (x *MemberHeartbeat) GetActorStatistics() *ActorStatistics
- func (*MemberHeartbeat) ProtoMessage()
- func (x *MemberHeartbeat) ProtoReflect() protoreflect.Message
- func (x *MemberHeartbeat) Reset()
- func (x *MemberHeartbeat) String() string
- type MemberJoinedEvent
- type MemberLeftEvent
- type MemberList
- func (ml *MemberList) BroadcastEvent(message interface{}, includeSelf bool)
- func (ml *MemberList) ContainsMemberID(memberID string) bool
- func (ml *MemberList) GetActivatorMember(kind string, requestSourceAddress string) string
- func (ml *MemberList) InitializeTopologyConsensus()
- func (ml *MemberList) Length() int
- func (ml *MemberList) Members() *MemberSet
- func (ml *MemberList) TerminateMember(m *Member)
- func (ml *MemberList) TopologyConsensus(ctx context.Context) (uint64, bool)
- func (ml *MemberList) UpdateClusterTopology(members Members)
- type MemberMeta
- type MemberRejoinedEvent
- type MemberSet
- func (ms *MemberSet) ContainsID(id string) bool
- func (ms *MemberSet) Equals(other *MemberSet) bool
- func (ms *MemberSet) Except(other *MemberSet) *MemberSet
- func (ms *MemberSet) ExceptIds(ids []string) *MemberSet
- func (ms *MemberSet) GetMemberById(id string) *Member
- func (ms *MemberSet) Len() int
- func (ms *MemberSet) Members() Members
- func (ms *MemberSet) TopologyHash() uint64
- func (ms *MemberSet) Union(other *MemberSet) *MemberSet
- type MemberStateDelta
- type MemberStatus
- type MemberStatusEvent
- type MemberStrategy
- type MemberUnavailableEvent
- type Members
- type NotifyAboutFailingSubscribersRequest
- func (*NotifyAboutFailingSubscribersRequest) Descriptor() ([]byte, []int)deprecated
- func (x *NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries() []*SubscriberDeliveryReport
- func (*NotifyAboutFailingSubscribersRequest) ProtoMessage()
- func (x *NotifyAboutFailingSubscribersRequest) ProtoReflect() protoreflect.Message
- func (x *NotifyAboutFailingSubscribersRequest) Reset()
- func (x *NotifyAboutFailingSubscribersRequest) String() string
- type NotifyAboutFailingSubscribersResponse
- func (*NotifyAboutFailingSubscribersResponse) Descriptor() ([]byte, []int)deprecated
- func (*NotifyAboutFailingSubscribersResponse) ProtoMessage()
- func (x *NotifyAboutFailingSubscribersResponse) ProtoReflect() protoreflect.Message
- func (x *NotifyAboutFailingSubscribersResponse) Reset()
- func (x *NotifyAboutFailingSubscribersResponse) String() string
- type Option
- type PackedActivations
- func (*PackedActivations) Descriptor() ([]byte, []int)deprecated
- func (x *PackedActivations) GetActors() []*PackedActivations_Kind
- func (x *PackedActivations) GetAddress() string
- func (*PackedActivations) ProtoMessage()
- func (x *PackedActivations) ProtoReflect() protoreflect.Message
- func (x *PackedActivations) Reset()
- func (x *PackedActivations) String() string
- type PackedActivations_Activation
- func (*PackedActivations_Activation) Descriptor() ([]byte, []int)deprecated
- func (x *PackedActivations_Activation) GetActivationId() string
- func (x *PackedActivations_Activation) GetIdentity() string
- func (*PackedActivations_Activation) ProtoMessage()
- func (x *PackedActivations_Activation) ProtoReflect() protoreflect.Message
- func (x *PackedActivations_Activation) Reset()
- func (x *PackedActivations_Activation) String() string
- type PackedActivations_Kind
- func (*PackedActivations_Kind) Descriptor() ([]byte, []int)deprecated
- func (x *PackedActivations_Kind) GetActivations() []*PackedActivations_Activation
- func (x *PackedActivations_Kind) GetName() string
- func (*PackedActivations_Kind) ProtoMessage()
- func (x *PackedActivations_Kind) ProtoReflect() protoreflect.Message
- func (x *PackedActivations_Kind) Reset()
- func (x *PackedActivations_Kind) String() string
- type PidCacheValue
- func (c *PidCacheValue) Get(identity string, kind string) (*actor.PID, bool)
- func (c *PidCacheValue) Remove(identity string, kind string)
- func (c *PidCacheValue) RemoveByMember(member *Member)
- func (c *PidCacheValue) RemoveByValue(identity string, kind string, pid *actor.PID)
- func (c *PidCacheValue) Set(identity string, kind string, pid *actor.PID)
- type PidResult
- type ProduceProcessInfo
- type ProducerQueueFullException
- type ProxyActivationRequest
- func (*ProxyActivationRequest) Descriptor() ([]byte, []int)deprecated
- func (x *ProxyActivationRequest) GetClusterIdentity() *ClusterIdentity
- func (x *ProxyActivationRequest) GetReplacedActivation() *actor.PID
- func (*ProxyActivationRequest) ProtoMessage()
- func (x *ProxyActivationRequest) ProtoReflect() protoreflect.Message
- func (x *ProxyActivationRequest) Reset()
- func (x *ProxyActivationRequest) String() string
- type PubSub
- type PubSubAutoRespondBatch
- type PubSubAutoRespondBatchTransport
- func (*PubSubAutoRespondBatchTransport) Descriptor() ([]byte, []int)deprecated
- func (t *PubSubAutoRespondBatchTransport) Deserialize() (remote.RootSerializable, error)
- func (x *PubSubAutoRespondBatchTransport) GetEnvelopes() []*PubSubEnvelope
- func (x *PubSubAutoRespondBatchTransport) GetTypeNames() []string
- func (*PubSubAutoRespondBatchTransport) ProtoMessage()
- func (x *PubSubAutoRespondBatchTransport) ProtoReflect() protoreflect.Message
- func (x *PubSubAutoRespondBatchTransport) Reset()
- func (x *PubSubAutoRespondBatchTransport) String() string
- type PubSubBatch
- type PubSubBatchTransport
- func (*PubSubBatchTransport) Descriptor() ([]byte, []int)deprecated
- func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error)
- func (x *PubSubBatchTransport) GetEnvelopes() []*PubSubEnvelope
- func (x *PubSubBatchTransport) GetTypeNames() []string
- func (*PubSubBatchTransport) ProtoMessage()
- func (x *PubSubBatchTransport) ProtoReflect() protoreflect.Message
- func (x *PubSubBatchTransport) Reset()
- func (x *PubSubBatchTransport) String() string
- type PubSubConfig
- type PubSubEnvelope
- func (*PubSubEnvelope) Descriptor() ([]byte, []int)deprecated
- func (x *PubSubEnvelope) GetMessageData() []byte
- func (x *PubSubEnvelope) GetSerializerId() int32
- func (x *PubSubEnvelope) GetTypeId() int32
- func (*PubSubEnvelope) ProtoMessage()
- func (x *PubSubEnvelope) ProtoReflect() protoreflect.Message
- func (x *PubSubEnvelope) Reset()
- func (x *PubSubEnvelope) String() string
- type PubSubMemberDeliveryActor
- func (p *PubSubMemberDeliveryActor) DeliverBatch(c actor.Context, batch *PubSubAutoRespondBatch, s *SubscriberIdentity) *actor.Future
- func (p *PubSubMemberDeliveryActor) DeliverToClusterIdentity(c actor.Context, batch *PubSubAutoRespondBatch, ci *ClusterIdentity) *actor.Future
- func (p *PubSubMemberDeliveryActor) DeliverToPid(c actor.Context, batch *PubSubAutoRespondBatch, pid *actor.PID) *actor.Future
- func (p *PubSubMemberDeliveryActor) Receive(c actor.Context)
- type PublishResponse
- type PublishStatus
- func (PublishStatus) Descriptor() protoreflect.EnumDescriptor
- func (x PublishStatus) Enum() *PublishStatus
- func (PublishStatus) EnumDescriptor() ([]byte, []int)deprecated
- func (x PublishStatus) Number() protoreflect.EnumNumber
- func (x PublishStatus) String() string
- func (PublishStatus) Type() protoreflect.EnumType
- type Publisher
- type PublisherConfig
- type PublishingErrorDecision
- type PublishingErrorHandler
- type ReadyForRebalance
- func (*ReadyForRebalance) Descriptor() ([]byte, []int)deprecated
- func (x *ReadyForRebalance) GetTopologyHash() uint64
- func (*ReadyForRebalance) ProtoMessage()
- func (x *ReadyForRebalance) ProtoReflect() protoreflect.Message
- func (x *ReadyForRebalance) Reset()
- func (x *ReadyForRebalance) String() string
- type RebalanceCompleted
- func (*RebalanceCompleted) Descriptor() ([]byte, []int)deprecated
- func (x *RebalanceCompleted) GetTopologyHash() uint64
- func (*RebalanceCompleted) ProtoMessage()
- func (x *RebalanceCompleted) ProtoReflect() protoreflect.Message
- func (x *RebalanceCompleted) Reset()
- func (x *RebalanceCompleted) String() string
- type RemoteIdentityHandover
- func (*RemoteIdentityHandover) Descriptor() ([]byte, []int)deprecated
- func (x *RemoteIdentityHandover) GetActors() *PackedActivations
- func (x *RemoteIdentityHandover) GetChunkId() int32
- func (x *RemoteIdentityHandover) GetFinal() bool
- func (x *RemoteIdentityHandover) GetSent() int32
- func (x *RemoteIdentityHandover) GetSkipped() int32
- func (x *RemoteIdentityHandover) GetTopologyHash() uint64
- func (*RemoteIdentityHandover) ProtoMessage()
- func (x *RemoteIdentityHandover) ProtoReflect() protoreflect.Message
- func (x *RemoteIdentityHandover) Reset()
- func (x *RemoteIdentityHandover) String() string
- type RemoveConsensusCheck
- type RemoveGossipMapState
- type Rendezvous
- type SendGossipStateRequest
- type SendGossipStateResponse
- type SetGossipMapState
- type SetGossipState
- type SetGossipStateResponse
- type SimpleRoundRobin
- type SpawnLock
- type StorageLookup
- type StoredActivation
- type SubscribeRequest
- func (*SubscribeRequest) Descriptor() ([]byte, []int)deprecated
- func (x *SubscribeRequest) GetSubscriber() *SubscriberIdentity
- func (*SubscribeRequest) ProtoMessage()
- func (x *SubscribeRequest) ProtoReflect() protoreflect.Message
- func (x *SubscribeRequest) Reset()
- func (x *SubscribeRequest) String() string
- type SubscribeResponse
- type SubscriberDeliveryReport
- func (*SubscriberDeliveryReport) Descriptor() ([]byte, []int)deprecated
- func (x *SubscriberDeliveryReport) GetStatus() DeliveryStatus
- func (x *SubscriberDeliveryReport) GetSubscriber() *SubscriberIdentity
- func (*SubscriberDeliveryReport) ProtoMessage()
- func (x *SubscriberDeliveryReport) ProtoReflect() protoreflect.Message
- func (x *SubscriberDeliveryReport) Reset()
- func (x *SubscriberDeliveryReport) String() string
- type SubscriberIdentity
- func (*SubscriberIdentity) Descriptor() ([]byte, []int)deprecated
- func (x *SubscriberIdentity) GetClusterIdentity() *ClusterIdentity
- func (m *SubscriberIdentity) GetIdentity() isSubscriberIdentity_Identity
- func (x *SubscriberIdentity) GetPid() *actor.PID
- func (*SubscriberIdentity) ProtoMessage()
- func (x *SubscriberIdentity) ProtoReflect() protoreflect.Message
- func (x *SubscriberIdentity) Reset()
- func (x *SubscriberIdentity) String() string
- type SubscriberIdentity_ClusterIdentity
- type SubscriberIdentity_Pid
- type Subscribers
- type TestMessage
- type TopicActor
- type UnsubscribeRequest
- func (*UnsubscribeRequest) Descriptor() ([]byte, []int)deprecated
- func (x *UnsubscribeRequest) GetSubscriber() *SubscriberIdentity
- func (*UnsubscribeRequest) ProtoMessage()
- func (x *UnsubscribeRequest) ProtoReflect() protoreflect.Message
- func (x *UnsubscribeRequest) Reset()
- func (x *UnsubscribeRequest) String() string
- type UnsubscribeResponse
Constants ¶
const ( ErrorReason_OK = "OK" ErrorReason_CANCELLED = "CANCELLED" ErrorReason_UNKNOWN = "UNKNOWN" ErrorReason_INVALID_ARGUMENT = "INVALID_ARGUMENT" ErrorReason_DEADLINE_EXCEEDED = "DEADLINE_EXCEEDED" ErrorReason_NOT_FOUND = "NOT_FOUND" ErrorReason_ALREADY_EXISTS = "ALREADY_EXISTS" ErrorReason_PERMISSION_DENIED = "PERMISSION_DENIED" ErrorReason_RESOURCE_EXHAUSTED = "RESOURCE_EXHAUSTED" ErrorReason_FAILED_PRECONDITION = "FAILED_PRECONDITION" ErrorReason_ABORTED = "ABORTED" ErrorReason_OUT_OF_RANGE = "OUT_OF_RANGE" ErrorReason_UNIMPLEMENTED = "UNIMPLEMENTED" ErrorReason_INTERNAL = "INTERNAL" ErrorReason_UNAVAILABLE = "UNAVAILABLE" ErrorReason_DATA_LOSS = "DATA_LOSS" ErrorReason_UNAUTHENTICATED = "UNAUTHENTICATED" )
const ( TopologyKey string = "topology" HeartbeatKey string = "heartbeat" GracefullyLeftKey string = "left" )
const DefaultGossipActorName string = "gossip"
const PubSubDeliveryName = "$pubsub-delivery"
const TopicActorKind = "prototopic"
Variables ¶
var ( IdentityHandoverAck_State_name = map[int32]string{ 0: "processed", 1: "incorrect_topology", } IdentityHandoverAck_State_value = map[string]int32{ "processed": 0, "incorrect_topology": 1, } )
Enum value maps for IdentityHandoverAck_State.
var ( DeliveryStatus_name = map[int32]string{ 0: "Delivered", 1: "SubscriberNoLongerReachable", 2: "Timeout", 127: "OtherError", } DeliveryStatus_value = map[string]int32{ "Delivered": 0, "SubscriberNoLongerReachable": 1, "Timeout": 2, "OtherError": 127, } )
Enum value maps for DeliveryStatus.
var ( PublishStatus_name = map[int32]string{ 0: "Ok", 1: "Failed", } PublishStatus_value = map[string]int32{ "Ok": 0, "Failed": 1, } )
Enum value maps for PublishStatus.
var FailBatchAndContinue = NewPublishingErrorDecision(0)
FailBatchAndContinue skips the current batch and proceeds to the next one. The delivery reports (tasks) related to that batch are still failed with the exception that triggered the error handling.
var FailBatchAndStop = NewPublishingErrorDecision(0)
FailBatchAndStop causes the BatchingProducer to stop and fail the pending messages
var File_cluster_proto protoreflect.FileDescriptor
var File_gossip_proto protoreflect.FileDescriptor
var File_grain_proto protoreflect.FileDescriptor
var File_pubsub_proto protoreflect.FileDescriptor
var File_pubsub_test_proto protoreflect.FileDescriptor
var RetryBatchImmediately = NewPublishingErrorDecision(0)
RetryBatchImmediately retries the current batch immediately
Functions ¶
func MembersToMap ¶
func NewGossipConsensusHandler ¶
func NewGossipConsensusHandler() *gossipConsensusHandler
func RemotePlacementActor ¶
RemotePlacementActor returns the PID of the remote placement actor
func SetClusterIdentity ¶
func SetClusterIdentity(ctx actor.ExtensionContext, ci *ClusterIdentity)
func SortMembers ¶
func SortMembers(members Members)
func TopologyHash ¶
func WithClusterIdentity ¶
func WithClusterIdentity(props *actor.Props, ci *ClusterIdentity) *actor.Props
Types ¶
type Acknowledge ¶
type Acknowledge struct {
// contains filtered or unexported fields
}
func (*Acknowledge) Descriptor
deprecated
func (*Acknowledge) Descriptor() ([]byte, []int)
Deprecated: Use Acknowledge.ProtoReflect.Descriptor instead.
func (*Acknowledge) ProtoMessage ¶
func (*Acknowledge) ProtoMessage()
func (*Acknowledge) ProtoReflect ¶
func (x *Acknowledge) ProtoReflect() protoreflect.Message
func (*Acknowledge) Reset ¶
func (x *Acknowledge) Reset()
func (*Acknowledge) String ¶
func (x *Acknowledge) String() string
type ActivatedKind ¶
type ActivatedKind struct { Kind string Props *actor.Props Strategy MemberStrategy // contains filtered or unexported fields }
func (*ActivatedKind) Count ¶
func (ak *ActivatedKind) Count() int32
func (*ActivatedKind) Dec ¶
func (ak *ActivatedKind) Dec()
func (*ActivatedKind) Inc ¶
func (ak *ActivatedKind) Inc()
type Activation ¶
type Activation struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` // contains filtered or unexported fields }
func (*Activation) Descriptor
deprecated
func (*Activation) Descriptor() ([]byte, []int)
Deprecated: Use Activation.ProtoReflect.Descriptor instead.
func (*Activation) GetClusterIdentity ¶
func (x *Activation) GetClusterIdentity() *ClusterIdentity
func (*Activation) GetPid ¶
func (x *Activation) GetPid() *actor.PID
func (*Activation) ProtoMessage ¶
func (*Activation) ProtoMessage()
func (*Activation) ProtoReflect ¶
func (x *Activation) ProtoReflect() protoreflect.Message
func (*Activation) Reset ¶
func (x *Activation) Reset()
func (*Activation) String ¶
func (x *Activation) String() string
type ActivationRequest ¶
type ActivationRequest struct { ClusterIdentity *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` RequestId string `protobuf:"bytes,2,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` TopologyHash uint64 `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*ActivationRequest) Descriptor
deprecated
func (*ActivationRequest) Descriptor() ([]byte, []int)
Deprecated: Use ActivationRequest.ProtoReflect.Descriptor instead.
func (*ActivationRequest) GetClusterIdentity ¶
func (x *ActivationRequest) GetClusterIdentity() *ClusterIdentity
func (*ActivationRequest) GetRequestId ¶
func (x *ActivationRequest) GetRequestId() string
func (*ActivationRequest) GetTopologyHash ¶
func (x *ActivationRequest) GetTopologyHash() uint64
func (*ActivationRequest) ProtoMessage ¶
func (*ActivationRequest) ProtoMessage()
func (*ActivationRequest) ProtoReflect ¶
func (x *ActivationRequest) ProtoReflect() protoreflect.Message
func (*ActivationRequest) Reset ¶
func (x *ActivationRequest) Reset()
func (*ActivationRequest) String ¶
func (x *ActivationRequest) String() string
type ActivationResponse ¶
type ActivationResponse struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` Failed bool `protobuf:"varint,2,opt,name=failed,proto3" json:"failed,omitempty"` TopologyHash uint64 `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*ActivationResponse) Descriptor
deprecated
func (*ActivationResponse) Descriptor() ([]byte, []int)
Deprecated: Use ActivationResponse.ProtoReflect.Descriptor instead.
func (*ActivationResponse) GetFailed ¶
func (x *ActivationResponse) GetFailed() bool
func (*ActivationResponse) GetPid ¶
func (x *ActivationResponse) GetPid() *actor.PID
func (*ActivationResponse) GetTopologyHash ¶
func (x *ActivationResponse) GetTopologyHash() uint64
func (*ActivationResponse) ProtoMessage ¶
func (*ActivationResponse) ProtoMessage()
func (*ActivationResponse) ProtoReflect ¶
func (x *ActivationResponse) ProtoReflect() protoreflect.Message
func (*ActivationResponse) Reset ¶
func (x *ActivationResponse) Reset()
func (*ActivationResponse) String ¶
func (x *ActivationResponse) String() string
type ActivationTerminated ¶
type ActivationTerminated struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` // contains filtered or unexported fields }
Terminated, removed from lookup
func (*ActivationTerminated) Descriptor
deprecated
func (*ActivationTerminated) Descriptor() ([]byte, []int)
Deprecated: Use ActivationTerminated.ProtoReflect.Descriptor instead.
func (*ActivationTerminated) GetClusterIdentity ¶
func (x *ActivationTerminated) GetClusterIdentity() *ClusterIdentity
func (*ActivationTerminated) GetPid ¶
func (x *ActivationTerminated) GetPid() *actor.PID
func (*ActivationTerminated) ProtoMessage ¶
func (*ActivationTerminated) ProtoMessage()
func (*ActivationTerminated) ProtoReflect ¶
func (x *ActivationTerminated) ProtoReflect() protoreflect.Message
func (*ActivationTerminated) Reset ¶
func (x *ActivationTerminated) Reset()
func (*ActivationTerminated) String ¶
func (x *ActivationTerminated) String() string
type ActivationTerminating ¶
type ActivationTerminating struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` // contains filtered or unexported fields }
Started terminating, not yet removed from IIdentityLookup
func (*ActivationTerminating) Descriptor
deprecated
func (*ActivationTerminating) Descriptor() ([]byte, []int)
Deprecated: Use ActivationTerminating.ProtoReflect.Descriptor instead.
func (*ActivationTerminating) GetClusterIdentity ¶
func (x *ActivationTerminating) GetClusterIdentity() *ClusterIdentity
func (*ActivationTerminating) GetPid ¶
func (x *ActivationTerminating) GetPid() *actor.PID
func (*ActivationTerminating) ProtoMessage ¶
func (*ActivationTerminating) ProtoMessage()
func (*ActivationTerminating) ProtoReflect ¶
func (x *ActivationTerminating) ProtoReflect() protoreflect.Message
func (*ActivationTerminating) Reset ¶
func (x *ActivationTerminating) Reset()
func (*ActivationTerminating) String ¶
func (x *ActivationTerminating) String() string
type ActorStatistics ¶
type ActorStatistics struct { ActorCount map[string]int64 `` /* 180-byte string literal not displayed */ // contains filtered or unexported fields }
func (*ActorStatistics) Descriptor
deprecated
func (*ActorStatistics) Descriptor() ([]byte, []int)
Deprecated: Use ActorStatistics.ProtoReflect.Descriptor instead.
func (*ActorStatistics) GetActorCount ¶
func (x *ActorStatistics) GetActorCount() map[string]int64
func (*ActorStatistics) ProtoMessage ¶
func (*ActorStatistics) ProtoMessage()
func (*ActorStatistics) ProtoReflect ¶
func (x *ActorStatistics) ProtoReflect() protoreflect.Message
func (*ActorStatistics) Reset ¶
func (x *ActorStatistics) Reset()
func (*ActorStatistics) String ¶
func (x *ActorStatistics) String() string
type AddConsensusCheck ¶
type AddConsensusCheck struct { ID string Check *ConsensusCheck }
func NewAddConsensusCheck ¶
func NewAddConsensusCheck(id string, check *ConsensusCheck) AddConsensusCheck
type BatchingProducer ¶
type BatchingProducer struct {
// contains filtered or unexported fields
}
func NewBatchingProducer ¶
func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchingProducerConfigOption) *BatchingProducer
func (*BatchingProducer) Dispose ¶
func (p *BatchingProducer) Dispose()
Dispose stops the producer and releases all resources.
func (*BatchingProducer) Produce ¶
func (p *BatchingProducer) Produce(ctx context.Context, message proto.Message) (*ProduceProcessInfo, error)
Produce a message to producer queue. The return info can be used to wait for the message to be published.
type BatchingProducerConfig ¶
type BatchingProducerConfig struct { // Maximum size of the published batch. Default: 2000. BatchSize int // Max size of the requests waiting in queue. If value is provided, the producer will throw // ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded // Note that bounded queue has better performance than unbounded queue. // Default: 0 (unbounded) MaxQueueSize int // How long to wait for the publishing to complete. // Default: 5s PublishTimeout time.Duration // Error handler that can decide what to do with an error when publishing a batch. // Default: Fail and stop the BatchingProducer OnPublishingError PublishingErrorHandler // A throttle for logging from this producer. By default, a throttle shared between all instances of // BatchingProducer is used, that allows for 10 events in 1 second. LogThrottle actor.ShouldThrottle // Optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean // up code to recover resources. PublisherIdleTimeout time.Duration }
type BatchingProducerConfigOption ¶
type BatchingProducerConfigOption func(config *BatchingProducerConfig)
func WithBatchingProducerBatchSize ¶
func WithBatchingProducerBatchSize(batchSize int) BatchingProducerConfigOption
WithBatchingProducerBatchSize sets maximum size of the published batch. Default: 2000.
func WithBatchingProducerLogThrottle ¶
func WithBatchingProducerLogThrottle(logThrottle actor.ShouldThrottle) BatchingProducerConfigOption
WithBatchingProducerLogThrottle sets a throttle for logging from this producer. By default, a throttle shared between all instances of BatchingProducer is used, that allows for 10 events in 10 seconds.
func WithBatchingProducerMaxQueueSize ¶
func WithBatchingProducerMaxQueueSize(maxQueueSize int) BatchingProducerConfigOption
WithBatchingProducerMaxQueueSize set max size of the requests waiting in queue. If value is provided, the producer will throw ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded Note that bounded queue has better performance than unbounded queue. Default: 0 (unbounded)
func WithBatchingProducerOnPublishingError ¶
func WithBatchingProducerOnPublishingError(onPublishingError PublishingErrorHandler) BatchingProducerConfigOption
WithBatchingProducerOnPublishingError sets error handler that can decide what to do with an error when publishing a batch. Default: Fail and stop the BatchingProducer
func WithBatchingProducerPublishTimeout ¶
func WithBatchingProducerPublishTimeout(publishTimeout time.Duration) BatchingProducerConfigOption
WithBatchingProducerPublishTimeout sets how long to wait for the publishing to complete. Default: 5s
func WithBatchingProducerPublisherIdleTimeout ¶
func WithBatchingProducerPublisherIdleTimeout(publisherIdleTimeout time.Duration) BatchingProducerConfigOption
WithBatchingProducerPublisherIdleTimeout sets an optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean up code to recover resources.
type Cluster ¶
type Cluster struct { ActorSystem *actor.ActorSystem Config *Config Gossip *Gossiper PubSub *PubSub Remote *remote.Remote PidCache *PidCacheValue MemberList *MemberList IdentityLookup IdentityLookup // contains filtered or unexported fields }
func GetCluster ¶
func GetCluster(actorSystem *actor.ActorSystem) *Cluster
func (*Cluster) BatchingProducer ¶
func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigOption) *BatchingProducer
BatchingProducer create a new PubSub batching producer for specified topic, that publishes directly to the topic actor
func (*Cluster) ExtensionID ¶
func (c *Cluster) ExtensionID() extensions.ExtensionID
func (*Cluster) GetClusterKind ¶
func (c *Cluster) GetClusterKind(kind string) *ActivatedKind
func (*Cluster) GetClusterKinds ¶
func (*Cluster) Publisher ¶
Publisher creates a new PubSub publisher that publishes messages directly to the TopicActor
func (*Cluster) Request ¶
func (c *Cluster) Request(identity string, kind string, message interface{}, option ...GrainCallOption) (interface{}, error)
func (*Cluster) RequestFuture ¶
func (*Cluster) StartClient ¶
func (c *Cluster) StartClient()
func (*Cluster) StartMember ¶
func (c *Cluster) StartMember()
func (*Cluster) SubscribeByClusterIdentity ¶
func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error)
SubscribeByClusterIdentity subscribes to a PubSub topic by cluster identity
func (*Cluster) SubscribeByPid ¶
func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error)
SubscribeByPid subscribes to a PubSub topic by subscriber PID
func (*Cluster) SubscribeWithReceive ¶
func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, opts ...GrainCallOption) (*SubscribeResponse, error)
SubscribeWithReceive subscribe to a PubSub topic by providing a Receive function, that will be used to spawn a subscriber actor
func (*Cluster) TryGetClusterKind ¶
func (c *Cluster) TryGetClusterKind(kind string) (*ActivatedKind, bool)
func (*Cluster) UnsubscribeByClusterIdentity ¶
func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error)
UnsubscribeByClusterIdentity unsubscribes from a PubSub topic by cluster identity
func (*Cluster) UnsubscribeByIdentityAndKind ¶
func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error)
UnsubscribeByIdentityAndKind unsubscribes from a PubSub topic by cluster identity
func (*Cluster) UnsubscribeByPid ¶
func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error)
UnsubscribeByPid unsubscribes from a PubSub topic by subscriber PID
type ClusterContextConfig ¶
type ClusterContextConfig struct { RequestsLogThrottlePeriod time.Duration MaxNumberOfEventsInRequestLogThrottledPeriod int // contains filtered or unexported fields }
ClusterContextConfig is used to configure cluster context parameters
type ClusterIdentity ¶
type ClusterIdentity struct { Identity string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"` Kind string `protobuf:"bytes,2,opt,name=kind,proto3" json:"kind,omitempty"` // contains filtered or unexported fields }
func GetClusterIdentity ¶
func GetClusterIdentity(ctx actor.ExtensionContext) *ClusterIdentity
func NewClusterIdentity ¶
func NewClusterIdentity(identity string, kind string) *ClusterIdentity
func (*ClusterIdentity) AsKey ¶
func (ci *ClusterIdentity) AsKey() string
func (*ClusterIdentity) Descriptor
deprecated
func (*ClusterIdentity) Descriptor() ([]byte, []int)
Deprecated: Use ClusterIdentity.ProtoReflect.Descriptor instead.
func (*ClusterIdentity) ExtensionID ¶
func (ci *ClusterIdentity) ExtensionID() ctxext.ContextExtensionID
func (*ClusterIdentity) GetIdentity ¶
func (x *ClusterIdentity) GetIdentity() string
func (*ClusterIdentity) GetKind ¶
func (x *ClusterIdentity) GetKind() string
func (*ClusterIdentity) ProtoMessage ¶
func (*ClusterIdentity) ProtoMessage()
func (*ClusterIdentity) ProtoReflect ¶
func (x *ClusterIdentity) ProtoReflect() protoreflect.Message
func (*ClusterIdentity) Reset ¶
func (x *ClusterIdentity) Reset()
func (*ClusterIdentity) String ¶
func (x *ClusterIdentity) String() string
type ClusterInit ¶
type ClusterInit struct { Identity *ClusterIdentity Cluster *Cluster }
type ClusterProvider ¶
type ClusterTopology ¶
type ClusterTopology struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Members []*Member `protobuf:"bytes,2,rep,name=members,proto3" json:"members,omitempty"` Joined []*Member `protobuf:"bytes,3,rep,name=joined,proto3" json:"joined,omitempty"` Left []*Member `protobuf:"bytes,4,rep,name=left,proto3" json:"left,omitempty"` Blocked []string `protobuf:"bytes,5,rep,name=blocked,proto3" json:"blocked,omitempty"` // contains filtered or unexported fields }
func (*ClusterTopology) Descriptor
deprecated
func (*ClusterTopology) Descriptor() ([]byte, []int)
Deprecated: Use ClusterTopology.ProtoReflect.Descriptor instead.
func (*ClusterTopology) GetBlocked ¶
func (x *ClusterTopology) GetBlocked() []string
func (*ClusterTopology) GetJoined ¶
func (x *ClusterTopology) GetJoined() []*Member
func (*ClusterTopology) GetLeft ¶
func (x *ClusterTopology) GetLeft() []*Member
func (*ClusterTopology) GetMembers ¶
func (x *ClusterTopology) GetMembers() []*Member
func (*ClusterTopology) GetTopologyHash ¶
func (x *ClusterTopology) GetTopologyHash() uint64
func (*ClusterTopology) ProtoMessage ¶
func (*ClusterTopology) ProtoMessage()
func (*ClusterTopology) ProtoReflect ¶
func (x *ClusterTopology) ProtoReflect() protoreflect.Message
func (*ClusterTopology) Reset ¶
func (x *ClusterTopology) Reset()
func (*ClusterTopology) String ¶
func (x *ClusterTopology) String() string
type ClusterTopologyNotification ¶
type ClusterTopologyNotification struct { MemberId string `protobuf:"bytes,1,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"` TopologyHash uint32 `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` LeaderId string `protobuf:"bytes,3,opt,name=leader_id,json=leaderId,proto3" json:"leader_id,omitempty"` // contains filtered or unexported fields }
func (*ClusterTopologyNotification) Descriptor
deprecated
func (*ClusterTopologyNotification) Descriptor() ([]byte, []int)
Deprecated: Use ClusterTopologyNotification.ProtoReflect.Descriptor instead.
func (*ClusterTopologyNotification) GetLeaderId ¶
func (x *ClusterTopologyNotification) GetLeaderId() string
func (*ClusterTopologyNotification) GetMemberId ¶
func (x *ClusterTopologyNotification) GetMemberId() string
func (*ClusterTopologyNotification) GetTopologyHash ¶
func (x *ClusterTopologyNotification) GetTopologyHash() uint32
func (*ClusterTopologyNotification) ProtoMessage ¶
func (*ClusterTopologyNotification) ProtoMessage()
func (*ClusterTopologyNotification) ProtoReflect ¶
func (x *ClusterTopologyNotification) ProtoReflect() protoreflect.Message
func (*ClusterTopologyNotification) Reset ¶
func (x *ClusterTopologyNotification) Reset()
func (*ClusterTopologyNotification) String ¶
func (x *ClusterTopologyNotification) String() string
type Config ¶
type Config struct { Name string Address string ClusterProvider ClusterProvider IdentityLookup IdentityLookup RemoteConfig *remote.Config RequestTimeoutTime time.Duration RequestsLogThrottlePeriod time.Duration RequestLog bool MaxNumberOfEventsInRequestLogThrottledPeriod int ClusterContextProducer ContextProducer MemberStrategyBuilder func(cluster *Cluster, kind string) MemberStrategy Kinds map[string]*Kind TimeoutTime time.Duration GossipInterval time.Duration GossipRequestTimeout time.Duration GossipFanOut int GossipMaxSend int HeartbeatExpiration time.Duration // Gossip heartbeat timeout. If the member does not update its heartbeat within this period, it will be added to the BlockList PubSubConfig *PubSubConfig }
func Configure ¶
func Configure(clusterName string, clusterProvider ClusterProvider, identityLookup IdentityLookup, remoteConfig *remote.Config, options ...ConfigOption) *Config
func (*Config) ToClusterContextConfig ¶
func (c *Config) ToClusterContextConfig(logger *slog.Logger) *ClusterContextConfig
ToClusterContextConfig converts this cluster Config Context parameters into a valid ClusterContextConfig value and returns a pointer to its memory
type ConfigOption ¶
type ConfigOption func(config *Config)
func WithClusterContextProducer ¶
func WithClusterContextProducer(producer ContextProducer) ConfigOption
WithClusterContextProducer sets the cluster context producer.
func WithHeartbeatExpiration ¶
func WithHeartbeatExpiration(t time.Duration) ConfigOption
WithHeartbeatExpiration sets the gossip heartbeat expiration.
func WithKinds ¶
func WithKinds(kinds ...*Kind) ConfigOption
func WithMaxNumberOfEventsInRequestLogThrottlePeriod ¶
func WithMaxNumberOfEventsInRequestLogThrottlePeriod(maxNumber int) ConfigOption
WithMaxNumberOfEventsInRequestLogThrottlePeriod sets the max number of events in request log throttled period.
func WithPubSubSubscriberTimeout ¶
func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption
WithPubSubSubscriberTimeout sets a timeout used when delivering a message batch to a subscriber. Default is 5s.
func WithRequestLog ¶
func WithRequestLog(enabled bool) ConfigOption
func WithRequestTimeout ¶
func WithRequestTimeout(t time.Duration) ConfigOption
WithRequestTimeout sets the request timeout.
func WithRequestsLogThrottlePeriod ¶
func WithRequestsLogThrottlePeriod(period time.Duration) ConfigOption
WithRequestsLogThrottlePeriod sets the requests log throttle period.
type ConsensusCheck ¶
type ConsensusCheck struct {
// contains filtered or unexported fields
}
data structure helpful to store consensus check information and behavior.
func NewConsensusCheck ¶
func NewConsensusCheck(affectedKeys []string, check GossipUpdater) ConsensusCheck
creates a new ConsensusCheck value with the given data and return it back.
type ConsensusCheckBuilder ¶
type ConsensusCheckBuilder struct {
// contains filtered or unexported fields
}
func (*ConsensusCheckBuilder) AffectedKeys ¶
func (ccb *ConsensusCheckBuilder) AffectedKeys() []string
func (*ConsensusCheckBuilder) Build ¶
func (ccb *ConsensusCheckBuilder) Build() (ConsensusHandler, *ConsensusCheck)
Build builds a new ConsensusHandler and ConsensusCheck values and returns pointers to them
func (*ConsensusCheckBuilder) Check ¶
func (ccb *ConsensusCheckBuilder) Check() ConsensusChecker
func (*ConsensusCheckBuilder) HasConsensus ¶
func (ccb *ConsensusCheckBuilder) HasConsensus(memberValues []*consensusMemberValue) (bool, uint64)
func (*ConsensusCheckBuilder) MapToValue ¶
func (ccb *ConsensusCheckBuilder) MapToValue(valueTuple *consensusValue) func(string, *GossipMemberState) (string, string, uint64)
type ConsensusCheckDefinition ¶
type ConsensusCheckDefinition interface { Check() *ConsensusCheck AffectedKeys() map[string]struct{} }
type ConsensusChecker ¶
type ConsensusChecker func(*GossipState, map[string]empty) (bool, interface{})
ConsensusChecker Customary type used to provide consensus check callbacks of any type note: this is equivalent to (for future go v1.18):
type ConsensusChecker[T] func(GossipState, map[string]empty) (bool, T)
type ConsensusChecks ¶
type ConsensusChecks struct {
// contains filtered or unexported fields
}
acts as a storage of pointers to ConsensusCheck stored by key.
func NewConsensusChecks ¶
func NewConsensusChecks() *ConsensusChecks
creates a new ConsensusChecks value and returns a pointer to it.
func (*ConsensusChecks) Add ¶
func (cc *ConsensusChecks) Add(id string, check *ConsensusCheck)
adds a new pointer to a ConsensusCheck value in the storage and registers its affected by keys index.
func (*ConsensusChecks) GetByUpdatedKey ¶
func (cc *ConsensusChecks) GetByUpdatedKey(key string) []*ConsensusCheck
iterates over all the keys stored in the set (map[string]empty) found in the given key map and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given key.
func (*ConsensusChecks) GetByUpdatedKeys ¶
func (cc *ConsensusChecks) GetByUpdatedKeys(keys []string) []*ConsensusCheck
iterate over all the keys stored in the set (map[string]empty) found in the given key maps and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given keys with removed duplicates on it (as it is a "set").
func (*ConsensusChecks) Remove ¶
func (cc *ConsensusChecks) Remove(id string)
Remove removes the given ConsensusCheck identity from the storage and removes its affected by keys index if needed after cleaning.
type ConsensusHandler ¶
type Context ¶
type Context interface { Request(identity string, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error) RequestFuture(identity string, kind string, message interface{}, opts ...GrainCallOption) (*actor.Future, error) }
Context is an interface any cluster context needs to implement
type ContextProducer ¶
Defines a type to provide DefaultContext configurations / implementations.
type DefaultContext ¶
type DefaultContext struct {
// contains filtered or unexported fields
}
Defines a default cluster context hashBytes structure.
func (*DefaultContext) Request ¶
func (dcc *DefaultContext) Request(identity, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error)
func (*DefaultContext) RequestFuture ¶
func (dcc *DefaultContext) RequestFuture(identity string, kind string, message interface{}, opts ...GrainCallOption) (*actor.Future, error)
type DeliverBatchRequest ¶
type DeliverBatchRequest struct { Subscribers *Subscribers PubSubBatch *PubSubBatch Topic string }
func (*DeliverBatchRequest) Serialize ¶
func (d *DeliverBatchRequest) Serialize() (remote.RootSerialized, error)
type DeliverBatchRequestTransport ¶
type DeliverBatchRequestTransport struct { Subscribers *Subscribers `protobuf:"bytes,1,opt,name=subscribers,proto3" json:"subscribers,omitempty"` Batch *PubSubBatchTransport `protobuf:"bytes,2,opt,name=batch,proto3" json:"batch,omitempty"` Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` // contains filtered or unexported fields }
Message sent from topic to delivery actor
func (*DeliverBatchRequestTransport) Descriptor
deprecated
func (*DeliverBatchRequestTransport) Descriptor() ([]byte, []int)
Deprecated: Use DeliverBatchRequestTransport.ProtoReflect.Descriptor instead.
func (*DeliverBatchRequestTransport) Deserialize ¶
func (t *DeliverBatchRequestTransport) Deserialize() (remote.RootSerializable, error)
func (*DeliverBatchRequestTransport) GetBatch ¶
func (x *DeliverBatchRequestTransport) GetBatch() *PubSubBatchTransport
func (*DeliverBatchRequestTransport) GetSubscribers ¶
func (x *DeliverBatchRequestTransport) GetSubscribers() *Subscribers
func (*DeliverBatchRequestTransport) GetTopic ¶
func (x *DeliverBatchRequestTransport) GetTopic() string
func (*DeliverBatchRequestTransport) ProtoMessage ¶
func (*DeliverBatchRequestTransport) ProtoMessage()
func (*DeliverBatchRequestTransport) ProtoReflect ¶
func (x *DeliverBatchRequestTransport) ProtoReflect() protoreflect.Message
func (*DeliverBatchRequestTransport) Reset ¶
func (x *DeliverBatchRequestTransport) Reset()
func (*DeliverBatchRequestTransport) String ¶
func (x *DeliverBatchRequestTransport) String() string
type DeliveryStatus ¶
type DeliveryStatus int32
Delivery status as seen by the delivery actor
const ( // Message was put in the queue of the subscriber DeliveryStatus_Delivered DeliveryStatus = 0 // Message did not reach subscriber, because it was dead DeliveryStatus_SubscriberNoLongerReachable DeliveryStatus = 1 // Delivery timed out DeliveryStatus_Timeout DeliveryStatus = 2 // Some other problem happened DeliveryStatus_OtherError DeliveryStatus = 127 )
func (DeliveryStatus) Descriptor ¶
func (DeliveryStatus) Descriptor() protoreflect.EnumDescriptor
func (DeliveryStatus) Enum ¶
func (x DeliveryStatus) Enum() *DeliveryStatus
func (DeliveryStatus) EnumDescriptor
deprecated
func (DeliveryStatus) EnumDescriptor() ([]byte, []int)
Deprecated: Use DeliveryStatus.Descriptor instead.
func (DeliveryStatus) Number ¶
func (x DeliveryStatus) Number() protoreflect.EnumNumber
func (DeliveryStatus) String ¶
func (x DeliveryStatus) String() string
func (DeliveryStatus) Type ¶
func (DeliveryStatus) Type() protoreflect.EnumType
type EmptyKeyValueStore ¶
type EmptyKeyValueStore[T any] struct{}
EmptyKeyValueStore is a key value store that does nothing.
func (*EmptyKeyValueStore[T]) Clear ¶
func (e *EmptyKeyValueStore[T]) Clear(_ context.Context, _ string) error
type GetGossipMapKeysRequest ¶
type GetGossipMapKeysRequest struct {
GossipStateKey string
}
Used to query the GossipActor about the keys in a GossipMap
type GetGossipMapKeysResponse ¶
type GetGossipMapKeysResponse struct {
MapKeys []string
}
Used by the GossipActor to send back the keys in a GossipMap
type GetGossipMapStateRequest ¶
Used to query the Gossip State containing GossipMap data type in the GossipActor
type GetGossipMapStateResponse ¶
Used by the GossipActor to send back the GossipMap value of a given key
type GetGossipStateRequest ¶
type GetGossipStateRequest struct {
GossipStateKey string
}
Used to query the GossipActor about a given key status
func NewGetGossipStateRequest ¶
func NewGetGossipStateRequest(key string) GetGossipStateRequest
Create a new GetGossipStateRequest value and return it back
type GetGossipStateResponse ¶
type GetGossipStateResponse struct {
State map[string]*GossipKeyValue
}
Used by the GossipActor to send back the status value of a given key
func NewGetGossipStateResponse ¶
func NewGetGossipStateResponse(state map[string]*GossipKeyValue) GetGossipStateResponse
type Gossip ¶
type Gossip interface { GossipStateStorer GossipConsensusChecker GossipCore }
The Gossip interface must be implemented by any value that pretends to participate with-in the Gossip protocol
type GossipActor ¶
type GossipActor struct {
// contains filtered or unexported fields
}
Actor used to send gossip messages around
func NewGossipActor ¶
func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers func() set.Set[string], fanOut int, maxSend int, system *actor.ActorSystem) *GossipActor
Creates a new GossipActor and returns a pointer to its location in the heap
func (*GossipActor) ReceiveState ¶
func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)
type GossipConsensusChecker ¶
type GossipConsensusChecker interface { AddConsensusCheck(id string, check *ConsensusCheck) RemoveConsensusCheck(id string) }
This interface must be implemented by any value that wants to add or remove consensus checkers
type GossipCore ¶
type GossipCore interface { UpdateClusterTopology(topology *ClusterTopology) ReceiveState(remoteState *GossipState) []*GossipUpdate SendState(sendStateToMember LocalStateSender) GetMemberStateDelta(targetMemberID string) *MemberStateDelta }
This interface must be implemented by any value that wants to react to cluster topology events
type GossipKeyValue ¶
type GossipKeyValue struct { SequenceNumber int64 `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` //version is local to the owner member Value *anypb.Any `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` //value is any format LocalTimestampUnixMilliseconds int64 `` /* 156-byte string literal not displayed */ // contains filtered or unexported fields }
a known key might be heartbeat. if we locally tag each entry with a local timestamp this means that we can measure if we have not received a new heartbeat from one member in some time even if we don't know the exact time the heartbeat was issued, due to clock differences. we still know when _we_ as in this node, got this data. and we can measure time from then til now.
if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead
func (*GossipKeyValue) Descriptor
deprecated
func (*GossipKeyValue) Descriptor() ([]byte, []int)
Deprecated: Use GossipKeyValue.ProtoReflect.Descriptor instead.
func (*GossipKeyValue) GetLocalTimestampUnixMilliseconds ¶
func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64
func (*GossipKeyValue) GetSequenceNumber ¶
func (x *GossipKeyValue) GetSequenceNumber() int64
func (*GossipKeyValue) GetValue ¶
func (x *GossipKeyValue) GetValue() *anypb.Any
func (*GossipKeyValue) ProtoMessage ¶
func (*GossipKeyValue) ProtoMessage()
func (*GossipKeyValue) ProtoReflect ¶
func (x *GossipKeyValue) ProtoReflect() protoreflect.Message
func (*GossipKeyValue) Reset ¶
func (x *GossipKeyValue) Reset()
func (*GossipKeyValue) String ¶
func (x *GossipKeyValue) String() string
type GossipKeyValues ¶
type GossipKeyValues = map[string]*GossipKeyValue
type GossipMap ¶
type GossipMap struct { Items map[string]*anypb.Any `` /* 151-byte string literal not displayed */ // contains filtered or unexported fields }
special datatype that is known by gossip actor set key remove key get keys
func (*GossipMap) Descriptor
deprecated
func (*GossipMap) ProtoMessage ¶
func (*GossipMap) ProtoMessage()
func (*GossipMap) ProtoReflect ¶
func (x *GossipMap) ProtoReflect() protoreflect.Message
type GossipMemberState ¶
type GossipMemberState struct { Values map[string]*GossipKeyValue `` /* 153-byte string literal not displayed */ // contains filtered or unexported fields }
string key is the key of the gossip value, e.g. "heartbeat" GossipKeyValue is the value of that key
func (*GossipMemberState) Descriptor
deprecated
func (*GossipMemberState) Descriptor() ([]byte, []int)
Deprecated: Use GossipMemberState.ProtoReflect.Descriptor instead.
func (*GossipMemberState) GetValues ¶
func (x *GossipMemberState) GetValues() map[string]*GossipKeyValue
func (*GossipMemberState) ProtoMessage ¶
func (*GossipMemberState) ProtoMessage()
func (*GossipMemberState) ProtoReflect ¶
func (x *GossipMemberState) ProtoReflect() protoreflect.Message
func (*GossipMemberState) Reset ¶
func (x *GossipMemberState) Reset()
func (*GossipMemberState) String ¶
func (x *GossipMemberState) String() string
type GossipMemberStates ¶
type GossipMemberStates = map[string]*GossipMemberState
type GossipRequest ¶
type GossipRequest struct { FromMemberId string `protobuf:"bytes,2,opt,name=from_member_id,json=fromMemberId,proto3" json:"from_member_id,omitempty"` State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` // contains filtered or unexported fields }
func (*GossipRequest) Descriptor
deprecated
func (*GossipRequest) Descriptor() ([]byte, []int)
Deprecated: Use GossipRequest.ProtoReflect.Descriptor instead.
func (*GossipRequest) GetFromMemberId ¶
func (x *GossipRequest) GetFromMemberId() string
func (*GossipRequest) GetState ¶
func (x *GossipRequest) GetState() *GossipState
func (*GossipRequest) ProtoMessage ¶
func (*GossipRequest) ProtoMessage()
func (*GossipRequest) ProtoReflect ¶
func (x *GossipRequest) ProtoReflect() protoreflect.Message
func (*GossipRequest) Reset ¶
func (x *GossipRequest) Reset()
func (*GossipRequest) String ¶
func (x *GossipRequest) String() string
type GossipResponse ¶
type GossipResponse struct { State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` // contains filtered or unexported fields }
Ack a gossip request
func (*GossipResponse) Descriptor
deprecated
func (*GossipResponse) Descriptor() ([]byte, []int)
Deprecated: Use GossipResponse.ProtoReflect.Descriptor instead.
func (*GossipResponse) GetState ¶
func (x *GossipResponse) GetState() *GossipState
func (*GossipResponse) ProtoMessage ¶
func (*GossipResponse) ProtoMessage()
func (*GossipResponse) ProtoReflect ¶
func (x *GossipResponse) ProtoReflect() protoreflect.Message
func (*GossipResponse) Reset ¶
func (x *GossipResponse) Reset()
func (*GossipResponse) String ¶
func (x *GossipResponse) String() string
type GossipState ¶
type GossipState struct { Members map[string]*GossipMemberState `` /* 155-byte string literal not displayed */ // contains filtered or unexported fields }
string key is member id GossipState is the state of that member
func (*GossipState) Descriptor
deprecated
func (*GossipState) Descriptor() ([]byte, []int)
Deprecated: Use GossipState.ProtoReflect.Descriptor instead.
func (*GossipState) GetMembers ¶
func (x *GossipState) GetMembers() map[string]*GossipMemberState
func (*GossipState) ProtoMessage ¶
func (*GossipState) ProtoMessage()
func (*GossipState) ProtoReflect ¶
func (x *GossipState) ProtoReflect() protoreflect.Message
func (*GossipState) Reset ¶
func (x *GossipState) Reset()
func (*GossipState) String ¶
func (x *GossipState) String() string
type GossipStateStorer ¶
type GossipStateStorer interface { GetState(key string) map[string]*GossipKeyValue SetState(key string, value proto.Message) SetMapState(stateKey string, mapKey string, value proto.Message) RemoveMapState(stateKey string, mapKey string) GetMapKeys(stateKey string) []string GetMapState(stateKey string, mapKey string) *anypb.Any }
This interface must be implemented by any value that. wants to be used as a gossip state storage
type GossipUpdate ¶
GossipUpdate Used to update gossip data when a ClusterTopology event occurs
type GossipUpdater ¶
type GossipUpdater func(*GossipState, map[string]empty)
type Gossiper ¶
type Gossiper struct { // The Gossiper Actor Name, defaults to "gossip" GossipActorName string // contains filtered or unexported fields }
The Gossiper data structure manages Gossip
func (*Gossiper) GetActorCount ¶
func (*Gossiper) GetMapKeys ¶
func (*Gossiper) GetMapState ¶
func (*Gossiper) GetState ¶
func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error)
func (*Gossiper) RegisterConsensusCheck ¶
func (g *Gossiper) RegisterConsensusCheck(key string, getValue func(*anypb.Any) interface{}) ConsensusHandler
RegisterConsensusCheck Builds a consensus handler and a consensus checker, send the checker to the Gossip actor and returns the handler back to the caller
func (*Gossiper) RemoveMapState ¶
func (*Gossiper) SetMapState ¶
func (*Gossiper) SetStateRequest ¶
SetStateRequest Sends a Request (that blocks) to update member state
func (*Gossiper) StartGossiping ¶
type GrainCallConfig ¶
type GrainCallConfig struct { RetryCount int Timeout time.Duration RetryAction func(n int) int Context actor.SenderContext }
func DefaultGrainCallConfig ¶
func DefaultGrainCallConfig(cluster *Cluster) *GrainCallConfig
func NewGrainCallOptions ¶
func NewGrainCallOptions(cluster *Cluster) *GrainCallConfig
type GrainCallOption ¶
type GrainCallOption func(config *GrainCallConfig)
func WithContext ¶
func WithContext(ctx actor.SenderContext) GrainCallOption
func WithRetryAction ¶
func WithRetryAction(act func(i int) int) GrainCallOption
func WithRetryCount ¶
func WithRetryCount(count int) GrainCallOption
func WithTimeout ¶
func WithTimeout(timeout time.Duration) GrainCallOption
type GrainContext ¶
func NewGrainContext ¶
func NewGrainContext(context actor.Context, identity *ClusterIdentity, cluster *Cluster) GrainContext
type GrainErrorResponse ¶
type GrainErrorResponse struct { Reason string `protobuf:"bytes,1,opt,name=reason,proto3" json:"reason,omitempty"` Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` Metadata map[string]string `` /* 157-byte string literal not displayed */ // contains filtered or unexported fields }
func FromError ¶
func FromError(err error) *GrainErrorResponse
func NewGrainErrorResponse ¶
func NewGrainErrorResponse(reason, message string) *GrainErrorResponse
func NewGrainErrorResponsef ¶
func NewGrainErrorResponsef(reason, format string, args ...interface{}) *GrainErrorResponse
func (*GrainErrorResponse) Descriptor
deprecated
func (*GrainErrorResponse) Descriptor() ([]byte, []int)
Deprecated: Use GrainErrorResponse.ProtoReflect.Descriptor instead.
func (*GrainErrorResponse) Error ¶
func (m *GrainErrorResponse) Error() string
func (*GrainErrorResponse) Errorf ¶
func (m *GrainErrorResponse) Errorf(format string, args ...interface{}) error
func (*GrainErrorResponse) GetMessage ¶
func (x *GrainErrorResponse) GetMessage() string
func (*GrainErrorResponse) GetMetadata ¶
func (x *GrainErrorResponse) GetMetadata() map[string]string
func (*GrainErrorResponse) GetReason ¶
func (x *GrainErrorResponse) GetReason() string
func (*GrainErrorResponse) Is ¶
func (m *GrainErrorResponse) Is(err error) bool
func (*GrainErrorResponse) ProtoMessage ¶
func (*GrainErrorResponse) ProtoMessage()
func (*GrainErrorResponse) ProtoReflect ¶
func (x *GrainErrorResponse) ProtoReflect() protoreflect.Message
func (*GrainErrorResponse) Reset ¶
func (x *GrainErrorResponse) Reset()
func (*GrainErrorResponse) String ¶
func (x *GrainErrorResponse) String() string
func (*GrainErrorResponse) WithMetadata ¶
func (m *GrainErrorResponse) WithMetadata(metadata map[string]string) *GrainErrorResponse
type GrainRequest ¶
type GrainRequest struct { MethodIndex int32 `protobuf:"varint,1,opt,name=method_index,json=methodIndex,proto3" json:"method_index,omitempty"` MessageData []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` MessageTypeName string `protobuf:"bytes,3,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"` // contains filtered or unexported fields }
func (*GrainRequest) Descriptor
deprecated
func (*GrainRequest) Descriptor() ([]byte, []int)
Deprecated: Use GrainRequest.ProtoReflect.Descriptor instead.
func (*GrainRequest) GetMessageData ¶
func (x *GrainRequest) GetMessageData() []byte
func (*GrainRequest) GetMessageTypeName ¶
func (x *GrainRequest) GetMessageTypeName() string
func (*GrainRequest) GetMethodIndex ¶
func (x *GrainRequest) GetMethodIndex() int32
func (*GrainRequest) ProtoMessage ¶
func (*GrainRequest) ProtoMessage()
func (*GrainRequest) ProtoReflect ¶
func (x *GrainRequest) ProtoReflect() protoreflect.Message
func (*GrainRequest) Reset ¶
func (x *GrainRequest) Reset()
func (*GrainRequest) String ¶
func (x *GrainRequest) String() string
type GrainResponse ¶
type GrainResponse struct { MessageData []byte `protobuf:"bytes,1,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` MessageTypeName string `protobuf:"bytes,2,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"` // contains filtered or unexported fields }
func (*GrainResponse) Descriptor
deprecated
func (*GrainResponse) Descriptor() ([]byte, []int)
Deprecated: Use GrainResponse.ProtoReflect.Descriptor instead.
func (*GrainResponse) GetMessageData ¶
func (x *GrainResponse) GetMessageData() []byte
func (*GrainResponse) GetMessageTypeName ¶
func (x *GrainResponse) GetMessageTypeName() string
func (*GrainResponse) ProtoMessage ¶
func (*GrainResponse) ProtoMessage()
func (*GrainResponse) ProtoReflect ¶
func (x *GrainResponse) ProtoReflect() protoreflect.Message
func (*GrainResponse) Reset ¶
func (x *GrainResponse) Reset()
func (*GrainResponse) String ¶
func (x *GrainResponse) String() string
type IdentityHandover ¶
type IdentityHandover struct { Actors []*Activation `protobuf:"bytes,1,rep,name=actors,proto3" json:"actors,omitempty"` ChunkId int32 `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` Final bool `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"` TopologyHash uint64 `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Skipped int32 `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"` // Total number of activations skipped Sent int32 `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"` // Total number of activations sent // contains filtered or unexported fields }
func (*IdentityHandover) Descriptor
deprecated
func (*IdentityHandover) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandover.ProtoReflect.Descriptor instead.
func (*IdentityHandover) GetActors ¶
func (x *IdentityHandover) GetActors() []*Activation
func (*IdentityHandover) GetChunkId ¶
func (x *IdentityHandover) GetChunkId() int32
func (*IdentityHandover) GetFinal ¶
func (x *IdentityHandover) GetFinal() bool
func (*IdentityHandover) GetSent ¶
func (x *IdentityHandover) GetSent() int32
func (*IdentityHandover) GetSkipped ¶
func (x *IdentityHandover) GetSkipped() int32
func (*IdentityHandover) GetTopologyHash ¶
func (x *IdentityHandover) GetTopologyHash() uint64
func (*IdentityHandover) ProtoMessage ¶
func (*IdentityHandover) ProtoMessage()
func (*IdentityHandover) ProtoReflect ¶
func (x *IdentityHandover) ProtoReflect() protoreflect.Message
func (*IdentityHandover) Reset ¶
func (x *IdentityHandover) Reset()
func (*IdentityHandover) String ¶
func (x *IdentityHandover) String() string
type IdentityHandoverAck ¶
type IdentityHandoverAck struct { ChunkId int32 `protobuf:"varint,1,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` TopologyHash uint64 `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` ProcessingState IdentityHandoverAck_State `` /* 146-byte string literal not displayed */ // contains filtered or unexported fields }
func (*IdentityHandoverAck) Descriptor
deprecated
func (*IdentityHandoverAck) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverAck.ProtoReflect.Descriptor instead.
func (*IdentityHandoverAck) GetChunkId ¶
func (x *IdentityHandoverAck) GetChunkId() int32
func (*IdentityHandoverAck) GetProcessingState ¶
func (x *IdentityHandoverAck) GetProcessingState() IdentityHandoverAck_State
func (*IdentityHandoverAck) GetTopologyHash ¶
func (x *IdentityHandoverAck) GetTopologyHash() uint64
func (*IdentityHandoverAck) ProtoMessage ¶
func (*IdentityHandoverAck) ProtoMessage()
func (*IdentityHandoverAck) ProtoReflect ¶
func (x *IdentityHandoverAck) ProtoReflect() protoreflect.Message
func (*IdentityHandoverAck) Reset ¶
func (x *IdentityHandoverAck) Reset()
func (*IdentityHandoverAck) String ¶
func (x *IdentityHandoverAck) String() string
type IdentityHandoverAck_State ¶
type IdentityHandoverAck_State int32
const ( IdentityHandoverAck_processed IdentityHandoverAck_State = 0 IdentityHandoverAck_incorrect_topology IdentityHandoverAck_State = 1 )
func (IdentityHandoverAck_State) Descriptor ¶
func (IdentityHandoverAck_State) Descriptor() protoreflect.EnumDescriptor
func (IdentityHandoverAck_State) Enum ¶
func (x IdentityHandoverAck_State) Enum() *IdentityHandoverAck_State
func (IdentityHandoverAck_State) EnumDescriptor
deprecated
func (IdentityHandoverAck_State) EnumDescriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverAck_State.Descriptor instead.
func (IdentityHandoverAck_State) Number ¶
func (x IdentityHandoverAck_State) Number() protoreflect.EnumNumber
func (IdentityHandoverAck_State) String ¶
func (x IdentityHandoverAck_State) String() string
func (IdentityHandoverAck_State) Type ¶
func (IdentityHandoverAck_State) Type() protoreflect.EnumType
type IdentityHandoverRequest ¶
type IdentityHandoverRequest struct { CurrentTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,1,opt,name=current_topology,json=currentTopology,proto3" json:"current_topology,omitempty"` Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` // If the requester passes a delta topology, only return activations which would not be assigned to the member // in the previous topology. DeltaTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,3,opt,name=delta_topology,json=deltaTopology,proto3" json:"delta_topology,omitempty"` // contains filtered or unexported fields }
request response call from Identity actor sent to each member asking what activations they hold that belong to the requester
func (*IdentityHandoverRequest) Descriptor
deprecated
func (*IdentityHandoverRequest) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverRequest.ProtoReflect.Descriptor instead.
func (*IdentityHandoverRequest) GetAddress ¶
func (x *IdentityHandoverRequest) GetAddress() string
func (*IdentityHandoverRequest) GetCurrentTopology ¶
func (x *IdentityHandoverRequest) GetCurrentTopology() *IdentityHandoverRequest_Topology
func (*IdentityHandoverRequest) GetDeltaTopology ¶
func (x *IdentityHandoverRequest) GetDeltaTopology() *IdentityHandoverRequest_Topology
func (*IdentityHandoverRequest) ProtoMessage ¶
func (*IdentityHandoverRequest) ProtoMessage()
func (*IdentityHandoverRequest) ProtoReflect ¶
func (x *IdentityHandoverRequest) ProtoReflect() protoreflect.Message
func (*IdentityHandoverRequest) Reset ¶
func (x *IdentityHandoverRequest) Reset()
func (*IdentityHandoverRequest) String ¶
func (x *IdentityHandoverRequest) String() string
type IdentityHandoverRequest_Topology ¶
type IdentityHandoverRequest_Topology struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Members []*Member `protobuf:"bytes,3,rep,name=members,proto3" json:"members,omitempty"` // contains filtered or unexported fields }
func (*IdentityHandoverRequest_Topology) Descriptor
deprecated
func (*IdentityHandoverRequest_Topology) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverRequest_Topology.ProtoReflect.Descriptor instead.
func (*IdentityHandoverRequest_Topology) GetMembers ¶
func (x *IdentityHandoverRequest_Topology) GetMembers() []*Member
func (*IdentityHandoverRequest_Topology) GetTopologyHash ¶
func (x *IdentityHandoverRequest_Topology) GetTopologyHash() uint64
func (*IdentityHandoverRequest_Topology) ProtoMessage ¶
func (*IdentityHandoverRequest_Topology) ProtoMessage()
func (*IdentityHandoverRequest_Topology) ProtoReflect ¶
func (x *IdentityHandoverRequest_Topology) ProtoReflect() protoreflect.Message
func (*IdentityHandoverRequest_Topology) Reset ¶
func (x *IdentityHandoverRequest_Topology) Reset()
func (*IdentityHandoverRequest_Topology) String ¶
func (x *IdentityHandoverRequest_Topology) String() string
type IdentityLookup ¶
type IdentityLookup interface { Get(clusterIdentity *ClusterIdentity) *actor.PID RemovePid(clusterIdentity *ClusterIdentity, pid *actor.PID) Setup(cluster *Cluster, kinds []string, isClient bool) Shutdown() }
IdentityLookup contains
type IdentityStorageLookup ¶
type IdentityStorageLookup struct { Storage StorageLookup // contains filtered or unexported fields }
IdentityStorageLookup contains
func (*IdentityStorageLookup) Get ¶
func (id *IdentityStorageLookup) Get(clusterIdentity *ClusterIdentity) *actor.PID
Get returns a PID for a given ClusterIdentity
func (*IdentityStorageLookup) RemoveMember ¶
func (i *IdentityStorageLookup) RemoveMember(memberID string)
RemoveMember from identity storage
type IdentityStorageWorker ¶
type IdentityStorageWorker struct {
// contains filtered or unexported fields
}
func (*IdentityStorageWorker) Receive ¶
func (ids *IdentityStorageWorker) Receive(c actor.Context)
Receive func
type Informer ¶
type Informer struct {
// contains filtered or unexported fields
}
The Informer data structure implements the Gossip interface
func (*Informer) AddConsensusCheck ¶
func (inf *Informer) AddConsensusCheck(id string, check *ConsensusCheck)
adds a new consensus checker to this informer
func (*Informer) CheckConsensus ¶
check consensus for the given keys
func (*Informer) GetMapKeys ¶
func (*Informer) GetMapState ¶
func (*Informer) GetMemberStateDelta ¶
func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelta
func (*Informer) GetState ¶
func (inf *Informer) GetState(key string) map[string]*GossipKeyValue
retrieves this informer current state for the given key returns map containing each known member id and their value
func (*Informer) ReceiveState ¶
func (inf *Informer) ReceiveState(remoteState *GossipState) []*GossipUpdate
receives a remote informer state
func (*Informer) RemoveConsensusCheck ¶
removes a consensus checker from this informer
func (*Informer) RemoveMapState ¶
func (*Informer) SendState ¶
func (inf *Informer) SendState(sendStateToMember LocalStateSender)
sends this informer local state to remote informers chosen randomly from the slice of other members known by this informer until gossipFanOut number of sent has been reached
func (*Informer) SetMapState ¶
func (*Informer) UpdateClusterTopology ¶
func (inf *Informer) UpdateClusterTopology(topology *ClusterTopology)
called when there is a cluster topology update
type Initialize ¶
type Initialize struct { IdleTimeout *durationpb.Duration `protobuf:"bytes,1,opt,name=idleTimeout,proto3" json:"idleTimeout,omitempty"` // contains filtered or unexported fields }
First request to initialize the actor.
func (*Initialize) Descriptor
deprecated
func (*Initialize) Descriptor() ([]byte, []int)
Deprecated: Use Initialize.ProtoReflect.Descriptor instead.
func (*Initialize) GetIdleTimeout ¶
func (x *Initialize) GetIdleTimeout() *durationpb.Duration
func (*Initialize) ProtoMessage ¶
func (*Initialize) ProtoMessage()
func (*Initialize) ProtoReflect ¶
func (x *Initialize) ProtoReflect() protoreflect.Message
func (*Initialize) Reset ¶
func (x *Initialize) Reset()
func (*Initialize) String ¶
func (x *Initialize) String() string
type InvalidOperationException ¶
type InvalidOperationException struct {
Topic string
}
func (*InvalidOperationException) Error ¶
func (i *InvalidOperationException) Error() string
func (*InvalidOperationException) Is ¶
func (i *InvalidOperationException) Is(err error) bool
type KeyValueStore ¶
type KeyValueStore[T any] interface { // Set the value for the given key. Set(ctx context.Context, key string, value T) error // Get the value for the given key.. Get(ctx context.Context, key string) (T, error) // Clear the value for the given key. Clear(ctx context.Context, key string) error }
KeyValueStore is a distributed key value store
type Kind ¶
type Kind struct { Kind string Props *actor.Props StrategyBuilder func(*Cluster) MemberStrategy }
Kind represents the kinds of actors a cluster can manage
func (*Kind) Build ¶
func (k *Kind) Build(cluster *Cluster) *ActivatedKind
func (*Kind) WithMemberStrategy ¶
func (k *Kind) WithMemberStrategy(strategyBuilder func(*Cluster) MemberStrategy)
type LocalStateSender ¶
type LocalStateSender func(memberStateDelta *MemberStateDelta, member *Member)
customary type that defines a states sender callback.
type Member ¶
type Member struct { Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` Port int32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` Kinds []string `protobuf:"bytes,4,rep,name=kinds,proto3" json:"kinds,omitempty"` // contains filtered or unexported fields }
func (*Member) Descriptor
deprecated
func (*Member) ProtoMessage ¶
func (*Member) ProtoMessage()
func (*Member) ProtoReflect ¶
func (x *Member) ProtoReflect() protoreflect.Message
type MemberAvailableEvent ¶
type MemberAvailableEvent struct {
MemberMeta
}
func (*MemberAvailableEvent) MemberStatusEvent ¶
func (*MemberAvailableEvent) MemberStatusEvent()
type MemberHeartbeat ¶
type MemberHeartbeat struct { ActorStatistics *ActorStatistics `protobuf:"bytes,1,opt,name=actor_statistics,json=actorStatistics,proto3" json:"actor_statistics,omitempty"` // contains filtered or unexported fields }
func (*MemberHeartbeat) Descriptor
deprecated
func (*MemberHeartbeat) Descriptor() ([]byte, []int)
Deprecated: Use MemberHeartbeat.ProtoReflect.Descriptor instead.
func (*MemberHeartbeat) GetActorStatistics ¶
func (x *MemberHeartbeat) GetActorStatistics() *ActorStatistics
func (*MemberHeartbeat) ProtoMessage ¶
func (*MemberHeartbeat) ProtoMessage()
func (*MemberHeartbeat) ProtoReflect ¶
func (x *MemberHeartbeat) ProtoReflect() protoreflect.Message
func (*MemberHeartbeat) Reset ¶
func (x *MemberHeartbeat) Reset()
func (*MemberHeartbeat) String ¶
func (x *MemberHeartbeat) String() string
type MemberJoinedEvent ¶
type MemberJoinedEvent struct {
MemberMeta
}
func (*MemberJoinedEvent) MemberStatusEvent ¶
func (*MemberJoinedEvent) MemberStatusEvent()
type MemberLeftEvent ¶
type MemberLeftEvent struct {
MemberMeta
}
func (*MemberLeftEvent) MemberStatusEvent ¶
func (*MemberLeftEvent) MemberStatusEvent()
type MemberList ¶
type MemberList struct {
// contains filtered or unexported fields
}
MemberList is responsible to keep track of the current cluster topology it does so by listening to changes from the ClusterProvider. the default ClusterProvider is consul.ConsulProvider which uses the Consul HTTP API to scan for changes
func NewMemberList ¶
func NewMemberList(cluster *Cluster) *MemberList
func (*MemberList) BroadcastEvent ¶
func (ml *MemberList) BroadcastEvent(message interface{}, includeSelf bool)
func (*MemberList) ContainsMemberID ¶
func (ml *MemberList) ContainsMemberID(memberID string) bool
func (*MemberList) GetActivatorMember ¶
func (ml *MemberList) GetActivatorMember(kind string, requestSourceAddress string) string
func (*MemberList) InitializeTopologyConsensus ¶
func (ml *MemberList) InitializeTopologyConsensus()
func (*MemberList) Length ¶
func (ml *MemberList) Length() int
func (*MemberList) Members ¶
func (ml *MemberList) Members() *MemberSet
func (*MemberList) TerminateMember ¶
func (ml *MemberList) TerminateMember(m *Member)
func (*MemberList) TopologyConsensus ¶
func (ml *MemberList) TopologyConsensus(ctx context.Context) (uint64, bool)
func (*MemberList) UpdateClusterTopology ¶
func (ml *MemberList) UpdateClusterTopology(members Members)
type MemberMeta ¶
func (*MemberMeta) GetKinds ¶
func (e *MemberMeta) GetKinds() []string
func (*MemberMeta) Name ¶
func (e *MemberMeta) Name() string
type MemberRejoinedEvent ¶
type MemberRejoinedEvent struct {
MemberMeta
}
func (*MemberRejoinedEvent) MemberStatusEvent ¶
func (*MemberRejoinedEvent) MemberStatusEvent()
type MemberSet ¶
type MemberSet struct {
// contains filtered or unexported fields
}
func NewMemberSet ¶
func (*MemberSet) ContainsID ¶
func (*MemberSet) GetMemberById ¶
func (*MemberSet) TopologyHash ¶
type MemberStateDelta ¶
type MemberStateDelta struct { TargetMemberID string HasState bool State *GossipState CommitOffsets func() }
type MemberStatus ¶
func (*MemberStatus) Address ¶
func (m *MemberStatus) Address() string
type MemberStatusEvent ¶
type MemberStatusEvent interface { MemberStatusEvent() GetKinds() []string }
type MemberStrategy ¶
type MemberUnavailableEvent ¶
type MemberUnavailableEvent struct {
}func (*MemberUnavailableEvent) MemberStatusEvent ¶
func (*MemberUnavailableEvent) MemberStatusEvent()
type NotifyAboutFailingSubscribersRequest ¶
type NotifyAboutFailingSubscribersRequest struct { InvalidDeliveries []*SubscriberDeliveryReport `protobuf:"bytes,1,rep,name=invalid_deliveries,json=invalidDeliveries,proto3" json:"invalid_deliveries,omitempty"` // contains filtered or unexported fields }
Message sent from delivery actor to topic to notify of subscribers that fail to process the messages
func (*NotifyAboutFailingSubscribersRequest) Descriptor
deprecated
func (*NotifyAboutFailingSubscribersRequest) Descriptor() ([]byte, []int)
Deprecated: Use NotifyAboutFailingSubscribersRequest.ProtoReflect.Descriptor instead.
func (*NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries ¶
func (x *NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries() []*SubscriberDeliveryReport
func (*NotifyAboutFailingSubscribersRequest) ProtoMessage ¶
func (*NotifyAboutFailingSubscribersRequest) ProtoMessage()
func (*NotifyAboutFailingSubscribersRequest) ProtoReflect ¶
func (x *NotifyAboutFailingSubscribersRequest) ProtoReflect() protoreflect.Message
func (*NotifyAboutFailingSubscribersRequest) Reset ¶
func (x *NotifyAboutFailingSubscribersRequest) Reset()
func (*NotifyAboutFailingSubscribersRequest) String ¶
func (x *NotifyAboutFailingSubscribersRequest) String() string
type NotifyAboutFailingSubscribersResponse ¶
type NotifyAboutFailingSubscribersResponse struct {
// contains filtered or unexported fields
}
Ack to the delivery actor after notification of subscribers that fail to process the messages
func (*NotifyAboutFailingSubscribersResponse) Descriptor
deprecated
func (*NotifyAboutFailingSubscribersResponse) Descriptor() ([]byte, []int)
Deprecated: Use NotifyAboutFailingSubscribersResponse.ProtoReflect.Descriptor instead.
func (*NotifyAboutFailingSubscribersResponse) ProtoMessage ¶
func (*NotifyAboutFailingSubscribersResponse) ProtoMessage()
func (*NotifyAboutFailingSubscribersResponse) ProtoReflect ¶
func (x *NotifyAboutFailingSubscribersResponse) ProtoReflect() protoreflect.Message
func (*NotifyAboutFailingSubscribersResponse) Reset ¶
func (x *NotifyAboutFailingSubscribersResponse) Reset()
func (*NotifyAboutFailingSubscribersResponse) String ¶
func (x *NotifyAboutFailingSubscribersResponse) String() string
type PackedActivations ¶
type PackedActivations struct { Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` Actors []*PackedActivations_Kind `protobuf:"bytes,2,rep,name=actors,proto3" json:"actors,omitempty"` // contains filtered or unexported fields }
func (*PackedActivations) Descriptor
deprecated
func (*PackedActivations) Descriptor() ([]byte, []int)
Deprecated: Use PackedActivations.ProtoReflect.Descriptor instead.
func (*PackedActivations) GetActors ¶
func (x *PackedActivations) GetActors() []*PackedActivations_Kind
func (*PackedActivations) GetAddress ¶
func (x *PackedActivations) GetAddress() string
func (*PackedActivations) ProtoMessage ¶
func (*PackedActivations) ProtoMessage()
func (*PackedActivations) ProtoReflect ¶
func (x *PackedActivations) ProtoReflect() protoreflect.Message
func (*PackedActivations) Reset ¶
func (x *PackedActivations) Reset()
func (*PackedActivations) String ¶
func (x *PackedActivations) String() string
type PackedActivations_Activation ¶
type PackedActivations_Activation struct { Identity string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"` ActivationId string `protobuf:"bytes,2,opt,name=activation_id,json=activationId,proto3" json:"activation_id,omitempty"` // contains filtered or unexported fields }
func (*PackedActivations_Activation) Descriptor
deprecated
func (*PackedActivations_Activation) Descriptor() ([]byte, []int)
Deprecated: Use PackedActivations_Activation.ProtoReflect.Descriptor instead.
func (*PackedActivations_Activation) GetActivationId ¶
func (x *PackedActivations_Activation) GetActivationId() string
func (*PackedActivations_Activation) GetIdentity ¶
func (x *PackedActivations_Activation) GetIdentity() string
func (*PackedActivations_Activation) ProtoMessage ¶
func (*PackedActivations_Activation) ProtoMessage()
func (*PackedActivations_Activation) ProtoReflect ¶
func (x *PackedActivations_Activation) ProtoReflect() protoreflect.Message
func (*PackedActivations_Activation) Reset ¶
func (x *PackedActivations_Activation) Reset()
func (*PackedActivations_Activation) String ¶
func (x *PackedActivations_Activation) String() string
type PackedActivations_Kind ¶
type PackedActivations_Kind struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Activations []*PackedActivations_Activation `protobuf:"bytes,2,rep,name=activations,proto3" json:"activations,omitempty"` // contains filtered or unexported fields }
func (*PackedActivations_Kind) Descriptor
deprecated
func (*PackedActivations_Kind) Descriptor() ([]byte, []int)
Deprecated: Use PackedActivations_Kind.ProtoReflect.Descriptor instead.
func (*PackedActivations_Kind) GetActivations ¶
func (x *PackedActivations_Kind) GetActivations() []*PackedActivations_Activation
func (*PackedActivations_Kind) GetName ¶
func (x *PackedActivations_Kind) GetName() string
func (*PackedActivations_Kind) ProtoMessage ¶
func (*PackedActivations_Kind) ProtoMessage()
func (*PackedActivations_Kind) ProtoReflect ¶
func (x *PackedActivations_Kind) ProtoReflect() protoreflect.Message
func (*PackedActivations_Kind) Reset ¶
func (x *PackedActivations_Kind) Reset()
func (*PackedActivations_Kind) String ¶
func (x *PackedActivations_Kind) String() string
type PidCacheValue ¶
type PidCacheValue struct {
// contains filtered or unexported fields
}
func NewPidCache ¶
func NewPidCache() *PidCacheValue
func (*PidCacheValue) Remove ¶
func (c *PidCacheValue) Remove(identity string, kind string)
func (*PidCacheValue) RemoveByMember ¶
func (c *PidCacheValue) RemoveByMember(member *Member)
func (*PidCacheValue) RemoveByValue ¶
func (c *PidCacheValue) RemoveByValue(identity string, kind string, pid *actor.PID)
type ProduceProcessInfo ¶
type ProduceProcessInfo struct { Finished chan struct{} Err error // contains filtered or unexported fields }
ProduceProcessInfo is the context for a Produce call
func (*ProduceProcessInfo) IsCancelled ¶
func (p *ProduceProcessInfo) IsCancelled() bool
IsCancelled returns true if the context has been cancelled
func (*ProduceProcessInfo) IsFinished ¶
func (p *ProduceProcessInfo) IsFinished() bool
IsFinished returns true if the context has been finished
type ProducerQueueFullException ¶
type ProducerQueueFullException struct {
// contains filtered or unexported fields
}
func (*ProducerQueueFullException) Error ¶
func (p *ProducerQueueFullException) Error() string
func (*ProducerQueueFullException) Is ¶
func (p *ProducerQueueFullException) Is(target error) bool
type ProxyActivationRequest ¶
type ProxyActivationRequest struct { ClusterIdentity *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` ReplacedActivation *actor.PID `protobuf:"bytes,2,opt,name=replaced_activation,json=replacedActivation,proto3" json:"replaced_activation,omitempty"` // contains filtered or unexported fields }
func (*ProxyActivationRequest) Descriptor
deprecated
func (*ProxyActivationRequest) Descriptor() ([]byte, []int)
Deprecated: Use ProxyActivationRequest.ProtoReflect.Descriptor instead.
func (*ProxyActivationRequest) GetClusterIdentity ¶
func (x *ProxyActivationRequest) GetClusterIdentity() *ClusterIdentity
func (*ProxyActivationRequest) GetReplacedActivation ¶
func (x *ProxyActivationRequest) GetReplacedActivation() *actor.PID
func (*ProxyActivationRequest) ProtoMessage ¶
func (*ProxyActivationRequest) ProtoMessage()
func (*ProxyActivationRequest) ProtoReflect ¶
func (x *ProxyActivationRequest) ProtoReflect() protoreflect.Message
func (*ProxyActivationRequest) Reset ¶
func (x *ProxyActivationRequest) Reset()
func (*ProxyActivationRequest) String ¶
func (x *ProxyActivationRequest) String() string
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
func GetPubSub ¶
func GetPubSub(system *actor.ActorSystem) *PubSub
GetPubSub returns the PubSub extension from the actor system
func (*PubSub) ExtensionID ¶
func (p *PubSub) ExtensionID() extensions.ExtensionID
type PubSubAutoRespondBatch ¶
func (*PubSubAutoRespondBatch) GetAutoResponse ¶
func (b *PubSubAutoRespondBatch) GetAutoResponse(_ actor.Context) interface{}
GetAutoResponse returns a PublishResponse.
func (*PubSubAutoRespondBatch) GetMessages ¶
func (b *PubSubAutoRespondBatch) GetMessages() []interface{}
GetMessages returns the message.
func (*PubSubAutoRespondBatch) Serialize ¶
func (b *PubSubAutoRespondBatch) Serialize() (remote.RootSerialized, error)
Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.
type PubSubAutoRespondBatchTransport ¶
type PubSubAutoRespondBatchTransport struct { TypeNames []string `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"` Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"` // contains filtered or unexported fields }
Message posted to subscriber's mailbox, that is then unrolled to single messages, and has ability to auto respond See also PubSubAutoRespondBatch
func (*PubSubAutoRespondBatchTransport) Descriptor
deprecated
func (*PubSubAutoRespondBatchTransport) Descriptor() ([]byte, []int)
Deprecated: Use PubSubAutoRespondBatchTransport.ProtoReflect.Descriptor instead.
func (*PubSubAutoRespondBatchTransport) Deserialize ¶
func (t *PubSubAutoRespondBatchTransport) Deserialize() (remote.RootSerializable, error)
Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.
func (*PubSubAutoRespondBatchTransport) GetEnvelopes ¶
func (x *PubSubAutoRespondBatchTransport) GetEnvelopes() []*PubSubEnvelope
func (*PubSubAutoRespondBatchTransport) GetTypeNames ¶
func (x *PubSubAutoRespondBatchTransport) GetTypeNames() []string
func (*PubSubAutoRespondBatchTransport) ProtoMessage ¶
func (*PubSubAutoRespondBatchTransport) ProtoMessage()
func (*PubSubAutoRespondBatchTransport) ProtoReflect ¶
func (x *PubSubAutoRespondBatchTransport) ProtoReflect() protoreflect.Message
func (*PubSubAutoRespondBatchTransport) Reset ¶
func (x *PubSubAutoRespondBatchTransport) Reset()
func (*PubSubAutoRespondBatchTransport) String ¶
func (x *PubSubAutoRespondBatchTransport) String() string
type PubSubBatch ¶
func (*PubSubBatch) Serialize ¶
func (b *PubSubBatch) Serialize() (remote.RootSerialized, error)
Serialize converts a PubSubBatch to a PubSubBatchTransport.
type PubSubBatchTransport ¶
type PubSubBatchTransport struct { TypeNames []string `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"` Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"` // contains filtered or unexported fields }
Message sent from publisher to topic actor See also PubSubBatch
func (*PubSubBatchTransport) Descriptor
deprecated
func (*PubSubBatchTransport) Descriptor() ([]byte, []int)
Deprecated: Use PubSubBatchTransport.ProtoReflect.Descriptor instead.
func (*PubSubBatchTransport) Deserialize ¶
func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error)
Deserialize converts a PubSubBatchTransport to a PubSubBatch.
func (*PubSubBatchTransport) GetEnvelopes ¶
func (x *PubSubBatchTransport) GetEnvelopes() []*PubSubEnvelope
func (*PubSubBatchTransport) GetTypeNames ¶
func (x *PubSubBatchTransport) GetTypeNames() []string
func (*PubSubBatchTransport) ProtoMessage ¶
func (*PubSubBatchTransport) ProtoMessage()
func (*PubSubBatchTransport) ProtoReflect ¶
func (x *PubSubBatchTransport) ProtoReflect() protoreflect.Message
func (*PubSubBatchTransport) Reset ¶
func (x *PubSubBatchTransport) Reset()
func (*PubSubBatchTransport) String ¶
func (x *PubSubBatchTransport) String() string
type PubSubConfig ¶
type PubSubConfig struct { // SubscriberTimeout is a timeout used when delivering a message batch to a subscriber. Default is 5s. // // This value gets rounded to seconds for optimization of cancellation token creation. Note that internally, // cluster request is used to deliver messages to ClusterIdentity subscribers. SubscriberTimeout time.Duration }
type PubSubEnvelope ¶
type PubSubEnvelope struct { TypeId int32 `protobuf:"varint,1,opt,name=type_id,json=typeId,proto3" json:"type_id,omitempty"` MessageData []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` SerializerId int32 `protobuf:"varint,3,opt,name=serializer_id,json=serializerId,proto3" json:"serializer_id,omitempty"` // contains filtered or unexported fields }
Contains message byte representation and type reference
func (*PubSubEnvelope) Descriptor
deprecated
func (*PubSubEnvelope) Descriptor() ([]byte, []int)
Deprecated: Use PubSubEnvelope.ProtoReflect.Descriptor instead.
func (*PubSubEnvelope) GetMessageData ¶
func (x *PubSubEnvelope) GetMessageData() []byte
func (*PubSubEnvelope) GetSerializerId ¶
func (x *PubSubEnvelope) GetSerializerId() int32
func (*PubSubEnvelope) GetTypeId ¶
func (x *PubSubEnvelope) GetTypeId() int32
func (*PubSubEnvelope) ProtoMessage ¶
func (*PubSubEnvelope) ProtoMessage()
func (*PubSubEnvelope) ProtoReflect ¶
func (x *PubSubEnvelope) ProtoReflect() protoreflect.Message
func (*PubSubEnvelope) Reset ¶
func (x *PubSubEnvelope) Reset()
func (*PubSubEnvelope) String ¶
func (x *PubSubEnvelope) String() string
type PubSubMemberDeliveryActor ¶
type PubSubMemberDeliveryActor struct {
// contains filtered or unexported fields
}
func NewPubSubMemberDeliveryActor ¶
func NewPubSubMemberDeliveryActor(subscriberTimeout time.Duration, logger *slog.Logger) *PubSubMemberDeliveryActor
func (*PubSubMemberDeliveryActor) DeliverBatch ¶
func (p *PubSubMemberDeliveryActor) DeliverBatch(c actor.Context, batch *PubSubAutoRespondBatch, s *SubscriberIdentity) *actor.Future
DeliverBatch delivers PubSubAutoRespondBatch to SubscriberIdentity.
func (*PubSubMemberDeliveryActor) DeliverToClusterIdentity ¶
func (p *PubSubMemberDeliveryActor) DeliverToClusterIdentity(c actor.Context, batch *PubSubAutoRespondBatch, ci *ClusterIdentity) *actor.Future
DeliverToClusterIdentity delivers PubSubAutoRespondBatch to ClusterIdentity.
func (*PubSubMemberDeliveryActor) DeliverToPid ¶
func (p *PubSubMemberDeliveryActor) DeliverToPid(c actor.Context, batch *PubSubAutoRespondBatch, pid *actor.PID) *actor.Future
DeliverToPid delivers PubSubAutoRespondBatch to PID.
func (*PubSubMemberDeliveryActor) Receive ¶
func (p *PubSubMemberDeliveryActor) Receive(c actor.Context)
type PublishResponse ¶
type PublishResponse struct { // Status of the whole published batch or single message Status PublishStatus `protobuf:"varint,1,opt,name=status,proto3,enum=cluster.PublishStatus" json:"status,omitempty"` // contains filtered or unexported fields }
Publish ack/nack response
func (*PublishResponse) Descriptor
deprecated
func (*PublishResponse) Descriptor() ([]byte, []int)
Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead.
func (*PublishResponse) GetStatus ¶
func (x *PublishResponse) GetStatus() PublishStatus
func (*PublishResponse) ProtoMessage ¶
func (*PublishResponse) ProtoMessage()
func (*PublishResponse) ProtoReflect ¶
func (x *PublishResponse) ProtoReflect() protoreflect.Message
func (*PublishResponse) Reset ¶
func (x *PublishResponse) Reset()
func (*PublishResponse) String ¶
func (x *PublishResponse) String() string
type PublishStatus ¶
type PublishStatus int32
Status of the whole published batch or single message
const ( // Batch or message was successfully published according to the delivery guarantees PublishStatus_Ok PublishStatus = 0 // Topic failed to forward the message PublishStatus_Failed PublishStatus = 1 )
func (PublishStatus) Descriptor ¶
func (PublishStatus) Descriptor() protoreflect.EnumDescriptor
func (PublishStatus) Enum ¶
func (x PublishStatus) Enum() *PublishStatus
func (PublishStatus) EnumDescriptor
deprecated
func (PublishStatus) EnumDescriptor() ([]byte, []int)
Deprecated: Use PublishStatus.Descriptor instead.
func (PublishStatus) Number ¶
func (x PublishStatus) Number() protoreflect.EnumNumber
func (PublishStatus) String ¶
func (x PublishStatus) String() string
func (PublishStatus) Type ¶
func (PublishStatus) Type() protoreflect.EnumType
type Publisher ¶
type Publisher interface { // Initialize the internal mechanisms of this publisher. Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) // PublishBatch publishes a batch of messages to the topic. PublishBatch(ctx context.Context, topic string, batch *PubSubBatch, opts ...GrainCallOption) (*PublishResponse, error) // Publish publishes a single message to the topic. Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) Logger() *slog.Logger }
func NewPublisher ¶
type PublisherConfig ¶
type PublishingErrorDecision ¶
func NewPublishingErrorDecision ¶
func NewPublishingErrorDecision(delay time.Duration) *PublishingErrorDecision
NewPublishingErrorDecision creates a new PublishingErrorDecision
func RetryBatchAfter ¶
func RetryBatchAfter(delay time.Duration) *PublishingErrorDecision
RetryBatchAfter returns a new PublishingErrorDecision with the Delay set to the given duration
type PublishingErrorHandler ¶
type PublishingErrorHandler func(retries int, e error, batch *PubSubBatch) *PublishingErrorDecision
PublishingErrorHandler decides what to do with a publishing error in BatchingProducer
type ReadyForRebalance ¶
type ReadyForRebalance struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*ReadyForRebalance) Descriptor
deprecated
func (*ReadyForRebalance) Descriptor() ([]byte, []int)
Deprecated: Use ReadyForRebalance.ProtoReflect.Descriptor instead.
func (*ReadyForRebalance) GetTopologyHash ¶
func (x *ReadyForRebalance) GetTopologyHash() uint64
func (*ReadyForRebalance) ProtoMessage ¶
func (*ReadyForRebalance) ProtoMessage()
func (*ReadyForRebalance) ProtoReflect ¶
func (x *ReadyForRebalance) ProtoReflect() protoreflect.Message
func (*ReadyForRebalance) Reset ¶
func (x *ReadyForRebalance) Reset()
func (*ReadyForRebalance) String ¶
func (x *ReadyForRebalance) String() string
type RebalanceCompleted ¶
type RebalanceCompleted struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*RebalanceCompleted) Descriptor
deprecated
func (*RebalanceCompleted) Descriptor() ([]byte, []int)
Deprecated: Use RebalanceCompleted.ProtoReflect.Descriptor instead.
func (*RebalanceCompleted) GetTopologyHash ¶
func (x *RebalanceCompleted) GetTopologyHash() uint64
func (*RebalanceCompleted) ProtoMessage ¶
func (*RebalanceCompleted) ProtoMessage()
func (*RebalanceCompleted) ProtoReflect ¶
func (x *RebalanceCompleted) ProtoReflect() protoreflect.Message
func (*RebalanceCompleted) Reset ¶
func (x *RebalanceCompleted) Reset()
func (*RebalanceCompleted) String ¶
func (x *RebalanceCompleted) String() string
type RemoteIdentityHandover ¶
type RemoteIdentityHandover struct { Actors *PackedActivations `protobuf:"bytes,1,opt,name=actors,proto3" json:"actors,omitempty"` ChunkId int32 `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` Final bool `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"` TopologyHash uint64 `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Skipped int32 `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"` Sent int32 `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"` // contains filtered or unexported fields }
func (*RemoteIdentityHandover) Descriptor
deprecated
func (*RemoteIdentityHandover) Descriptor() ([]byte, []int)
Deprecated: Use RemoteIdentityHandover.ProtoReflect.Descriptor instead.
func (*RemoteIdentityHandover) GetActors ¶
func (x *RemoteIdentityHandover) GetActors() *PackedActivations
func (*RemoteIdentityHandover) GetChunkId ¶
func (x *RemoteIdentityHandover) GetChunkId() int32
func (*RemoteIdentityHandover) GetFinal ¶
func (x *RemoteIdentityHandover) GetFinal() bool
func (*RemoteIdentityHandover) GetSent ¶
func (x *RemoteIdentityHandover) GetSent() int32
func (*RemoteIdentityHandover) GetSkipped ¶
func (x *RemoteIdentityHandover) GetSkipped() int32
func (*RemoteIdentityHandover) GetTopologyHash ¶
func (x *RemoteIdentityHandover) GetTopologyHash() uint64
func (*RemoteIdentityHandover) ProtoMessage ¶
func (*RemoteIdentityHandover) ProtoMessage()
func (*RemoteIdentityHandover) ProtoReflect ¶
func (x *RemoteIdentityHandover) ProtoReflect() protoreflect.Message
func (*RemoteIdentityHandover) Reset ¶
func (x *RemoteIdentityHandover) Reset()
func (*RemoteIdentityHandover) String ¶
func (x *RemoteIdentityHandover) String() string
type RemoveConsensusCheck ¶
type RemoveConsensusCheck struct {
ID string
}
Mimic .NET ReenterAfterCancellation on GossipActor
func NewRemoveConsensusCheck ¶
func NewRemoveConsensusCheck(id string) RemoveConsensusCheck
type RemoveGossipMapState ¶
Used to remove Gossip State containing GossipMap data type in the GossipActor
type Rendezvous ¶
type Rendezvous struct {
// contains filtered or unexported fields
}
func NewRendezvous ¶
func NewRendezvous() *Rendezvous
func (*Rendezvous) GetByClusterIdentity ¶
func (r *Rendezvous) GetByClusterIdentity(ci *ClusterIdentity) string
func (*Rendezvous) GetByIdentity ¶
func (r *Rendezvous) GetByIdentity(identity string) string
func (*Rendezvous) UpdateMembers ¶
func (r *Rendezvous) UpdateMembers(members Members)
type SendGossipStateRequest ¶
type SendGossipStateRequest struct{}
type SendGossipStateResponse ¶
type SendGossipStateResponse struct{}
type SetGossipMapState ¶
Used to set Gossip State containing GossipMap data type in the GossipActor
type SetGossipState ¶
Used to setup Gossip State Keys in the GossipActor
func NewGossipStateKey ¶
func NewGossipStateKey(key string, value proto.Message) SetGossipState
Create a new SetGossipState value with the given data and return it back
type SetGossipStateResponse ¶
type SetGossipStateResponse struct{}
Used by the GossipActor to respond SetGossipStatus requests
type SimpleRoundRobin ¶
type SimpleRoundRobin struct {
// contains filtered or unexported fields
}
func NewSimpleRoundRobin ¶
func NewSimpleRoundRobin(memberStrategy MemberStrategy) *SimpleRoundRobin
func (*SimpleRoundRobin) GetByRoundRobin ¶
func (r *SimpleRoundRobin) GetByRoundRobin() string
type SpawnLock ¶
type SpawnLock struct { LockID string ClusterIdentity *ClusterIdentity }
SpawnLock contains
type StorageLookup ¶
type StorageLookup interface { TryGetExistingActivation(clusterIdentity *ClusterIdentity) *StoredActivation TryAcquireLock(clusterIdentity *ClusterIdentity) *SpawnLock WaitForActivation(clusterIdentity *ClusterIdentity) *StoredActivation RemoveLock(spawnLock SpawnLock) StoreActivation(memberID string, spawnLock *SpawnLock, pid *actor.PID) RemoveActivation(pid *SpawnLock) RemoveMemberId(memberID string) }
StorageLookup contains
type StoredActivation ¶
StoredActivation contains
type SubscribeRequest ¶
type SubscribeRequest struct { Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` // contains filtered or unexported fields }
Sent to topic actor to add a subscriber
func (*SubscribeRequest) Descriptor
deprecated
func (*SubscribeRequest) Descriptor() ([]byte, []int)
Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead.
func (*SubscribeRequest) GetSubscriber ¶
func (x *SubscribeRequest) GetSubscriber() *SubscriberIdentity
func (*SubscribeRequest) ProtoMessage ¶
func (*SubscribeRequest) ProtoMessage()
func (*SubscribeRequest) ProtoReflect ¶
func (x *SubscribeRequest) ProtoReflect() protoreflect.Message
func (*SubscribeRequest) Reset ¶
func (x *SubscribeRequest) Reset()
func (*SubscribeRequest) String ¶
func (x *SubscribeRequest) String() string
type SubscribeResponse ¶
type SubscribeResponse struct {
// contains filtered or unexported fields
}
Subscribe acknowledgement
func (*SubscribeResponse) Descriptor
deprecated
func (*SubscribeResponse) Descriptor() ([]byte, []int)
Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead.
func (*SubscribeResponse) ProtoMessage ¶
func (*SubscribeResponse) ProtoMessage()
func (*SubscribeResponse) ProtoReflect ¶
func (x *SubscribeResponse) ProtoReflect() protoreflect.Message
func (*SubscribeResponse) Reset ¶
func (x *SubscribeResponse) Reset()
func (*SubscribeResponse) String ¶
func (x *SubscribeResponse) String() string
type SubscriberDeliveryReport ¶
type SubscriberDeliveryReport struct { Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` Status DeliveryStatus `protobuf:"varint,2,opt,name=status,proto3,enum=cluster.DeliveryStatus" json:"status,omitempty"` // contains filtered or unexported fields }
Contains information about a failed delivery
func (*SubscriberDeliveryReport) Descriptor
deprecated
func (*SubscriberDeliveryReport) Descriptor() ([]byte, []int)
Deprecated: Use SubscriberDeliveryReport.ProtoReflect.Descriptor instead.
func (*SubscriberDeliveryReport) GetStatus ¶
func (x *SubscriberDeliveryReport) GetStatus() DeliveryStatus
func (*SubscriberDeliveryReport) GetSubscriber ¶
func (x *SubscriberDeliveryReport) GetSubscriber() *SubscriberIdentity
func (*SubscriberDeliveryReport) ProtoMessage ¶
func (*SubscriberDeliveryReport) ProtoMessage()
func (*SubscriberDeliveryReport) ProtoReflect ¶
func (x *SubscriberDeliveryReport) ProtoReflect() protoreflect.Message
func (*SubscriberDeliveryReport) Reset ¶
func (x *SubscriberDeliveryReport) Reset()
func (*SubscriberDeliveryReport) String ¶
func (x *SubscriberDeliveryReport) String() string
type SubscriberIdentity ¶
type SubscriberIdentity struct { // Types that are assignable to Identity: // *SubscriberIdentity_Pid // *SubscriberIdentity_ClusterIdentity Identity isSubscriberIdentity_Identity `protobuf_oneof:"Identity"` // contains filtered or unexported fields }
Identifies a subscriber by either ClusterIdentity or PID
func (*SubscriberIdentity) Descriptor
deprecated
func (*SubscriberIdentity) Descriptor() ([]byte, []int)
Deprecated: Use SubscriberIdentity.ProtoReflect.Descriptor instead.
func (*SubscriberIdentity) GetClusterIdentity ¶
func (x *SubscriberIdentity) GetClusterIdentity() *ClusterIdentity
func (*SubscriberIdentity) GetIdentity ¶
func (m *SubscriberIdentity) GetIdentity() isSubscriberIdentity_Identity
func (*SubscriberIdentity) GetPid ¶
func (x *SubscriberIdentity) GetPid() *actor.PID
func (*SubscriberIdentity) ProtoMessage ¶
func (*SubscriberIdentity) ProtoMessage()
func (*SubscriberIdentity) ProtoReflect ¶
func (x *SubscriberIdentity) ProtoReflect() protoreflect.Message
func (*SubscriberIdentity) Reset ¶
func (x *SubscriberIdentity) Reset()
func (*SubscriberIdentity) String ¶
func (x *SubscriberIdentity) String() string
type SubscriberIdentity_ClusterIdentity ¶
type SubscriberIdentity_ClusterIdentity struct {
ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3,oneof"`
}
type SubscriberIdentity_Pid ¶
type Subscribers ¶
type Subscribers struct { Subscribers []*SubscriberIdentity `protobuf:"bytes,1,rep,name=subscribers,proto3" json:"subscribers,omitempty"` // contains filtered or unexported fields }
A list of subscribers
func (*Subscribers) Descriptor
deprecated
func (*Subscribers) Descriptor() ([]byte, []int)
Deprecated: Use Subscribers.ProtoReflect.Descriptor instead.
func (*Subscribers) GetSubscribers ¶
func (x *Subscribers) GetSubscribers() []*SubscriberIdentity
func (*Subscribers) ProtoMessage ¶
func (*Subscribers) ProtoMessage()
func (*Subscribers) ProtoReflect ¶
func (x *Subscribers) ProtoReflect() protoreflect.Message
func (*Subscribers) Reset ¶
func (x *Subscribers) Reset()
func (*Subscribers) String ¶
func (x *Subscribers) String() string
type TestMessage ¶
type TestMessage struct { Number int32 `protobuf:"varint,1,opt,name=number,proto3" json:"number,omitempty"` // contains filtered or unexported fields }
func (*TestMessage) Descriptor
deprecated
func (*TestMessage) Descriptor() ([]byte, []int)
Deprecated: Use TestMessage.ProtoReflect.Descriptor instead.
func (*TestMessage) GetNumber ¶
func (x *TestMessage) GetNumber() int32
func (*TestMessage) ProtoMessage ¶
func (*TestMessage) ProtoMessage()
func (*TestMessage) ProtoReflect ¶
func (x *TestMessage) ProtoReflect() protoreflect.Message
func (*TestMessage) Reset ¶
func (x *TestMessage) Reset()
func (*TestMessage) String ¶
func (x *TestMessage) String() string
type TopicActor ¶
type TopicActor struct {
// contains filtered or unexported fields
}
func NewTopicActor ¶
func NewTopicActor(store KeyValueStore[*Subscribers], logger *slog.Logger) *TopicActor
func (*TopicActor) Receive ¶
func (t *TopicActor) Receive(c actor.Context)
type UnsubscribeRequest ¶
type UnsubscribeRequest struct { Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` // contains filtered or unexported fields }
Sent to topic actor to remove a subscriber
func (*UnsubscribeRequest) Descriptor
deprecated
func (*UnsubscribeRequest) Descriptor() ([]byte, []int)
Deprecated: Use UnsubscribeRequest.ProtoReflect.Descriptor instead.
func (*UnsubscribeRequest) GetSubscriber ¶
func (x *UnsubscribeRequest) GetSubscriber() *SubscriberIdentity
func (*UnsubscribeRequest) ProtoMessage ¶
func (*UnsubscribeRequest) ProtoMessage()
func (*UnsubscribeRequest) ProtoReflect ¶
func (x *UnsubscribeRequest) ProtoReflect() protoreflect.Message
func (*UnsubscribeRequest) Reset ¶
func (x *UnsubscribeRequest) Reset()
func (*UnsubscribeRequest) String ¶
func (x *UnsubscribeRequest) String() string
type UnsubscribeResponse ¶
type UnsubscribeResponse struct {
// contains filtered or unexported fields
}
Unsubscribe acknowledgement
func (*UnsubscribeResponse) Descriptor
deprecated
func (*UnsubscribeResponse) Descriptor() ([]byte, []int)
Deprecated: Use UnsubscribeResponse.ProtoReflect.Descriptor instead.
func (*UnsubscribeResponse) ProtoMessage ¶
func (*UnsubscribeResponse) ProtoMessage()
func (*UnsubscribeResponse) ProtoReflect ¶
func (x *UnsubscribeResponse) ProtoReflect() protoreflect.Message
func (*UnsubscribeResponse) Reset ¶
func (x *UnsubscribeResponse) Reset()
func (*UnsubscribeResponse) String ¶
func (x *UnsubscribeResponse) String() string
Source Files ¶
- cluster.go
- cluster.pb.go
- cluster_config_context.go
- cluster_identity.go
- cluster_provider.go
- config.go
- config_opts.go
- consensus.go
- consensus_check_builder.go
- consensus_checks.go
- context.go
- default_context.go
- errors.go
- gossip.go
- gossip.pb.go
- gossip_actor.go
- gossip_state_management.go
- gossiper.go
- grain.go
- grain.pb.go
- grain_context.go
- identity_lookup.go
- identity_storage_lookup.go
- identity_storage_worker.go
- informer.go
- key_value_store.go
- kind.go
- member.go
- member_list.go
- member_state_delta.go
- member_status.go
- member_status_events.go
- member_strategy.go
- members.go
- messages.go
- options.go
- pid_cache.go
- pubsub.go
- pubsub.pb.go
- pubsub_batch.go
- pubsub_delivery.go
- pubsub_extensions.go
- pubsub_producer.go
- pubsub_producer_opts.go
- pubsub_publisher.go
- pubsub_test.pb.go
- pubsub_topic.go
- rendezvous.go
- round_robin.go
- types.go