Documentation ¶
Index ¶
- Constants
- Variables
- func MembersToMap(members Members) map[string]*Member
- func NewGossipConsensusHandler() *gossipConsensusHandler
- func RemotePlacementActor(address string) *actor.PID
- func SetClusterIdentity(ctx actor.ExtensionContext, ci *ClusterIdentity)
- func SortMembers(members Members)
- func TopologyHash(members Members) uint64
- func WithClusterIdentity(props *actor.Props, ci *ClusterIdentity) *actor.Props
- type Acknowledge
- type ActivatedKind
- type Activation
- func (*Activation) Descriptor() ([]byte, []int)deprecated
- func (x *Activation) GetClusterIdentity() *ClusterIdentity
- func (x *Activation) GetPid() *actor.PID
- func (*Activation) ProtoMessage()
- func (x *Activation) ProtoReflect() protoreflect.Message
- func (x *Activation) Reset()
- func (x *Activation) String() string
- type ActivationRequest
- func (*ActivationRequest) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationRequest) GetClusterIdentity() *ClusterIdentity
- func (x *ActivationRequest) GetRequestId() string
- func (x *ActivationRequest) GetTopologyHash() uint64
- func (*ActivationRequest) ProtoMessage()
- func (x *ActivationRequest) ProtoReflect() protoreflect.Message
- func (x *ActivationRequest) Reset()
- func (x *ActivationRequest) String() string
- type ActivationResponse
- func (*ActivationResponse) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationResponse) GetFailed() bool
- func (x *ActivationResponse) GetPid() *actor.PID
- func (x *ActivationResponse) GetTopologyHash() uint64
- func (*ActivationResponse) ProtoMessage()
- func (x *ActivationResponse) ProtoReflect() protoreflect.Message
- func (x *ActivationResponse) Reset()
- func (x *ActivationResponse) String() string
- type ActivationTerminated
- func (*ActivationTerminated) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationTerminated) GetClusterIdentity() *ClusterIdentity
- func (x *ActivationTerminated) GetPid() *actor.PID
- func (*ActivationTerminated) ProtoMessage()
- func (x *ActivationTerminated) ProtoReflect() protoreflect.Message
- func (x *ActivationTerminated) Reset()
- func (x *ActivationTerminated) String() string
- type ActivationTerminating
- func (*ActivationTerminating) Descriptor() ([]byte, []int)deprecated
- func (x *ActivationTerminating) GetClusterIdentity() *ClusterIdentity
- func (x *ActivationTerminating) GetPid() *actor.PID
- func (*ActivationTerminating) ProtoMessage()
- func (x *ActivationTerminating) ProtoReflect() protoreflect.Message
- func (x *ActivationTerminating) Reset()
- func (x *ActivationTerminating) String() string
- type ActorStatistics
- type AddConsensusCheck
- type BatchingProducer
- type BatchingProducerConfig
- type BatchingProducerConfigOption
- func WithBatchingProducerBatchSize(batchSize int) BatchingProducerConfigOption
- func WithBatchingProducerLogThrottle(logThrottle actor.ShouldThrottle) BatchingProducerConfigOption
- func WithBatchingProducerMaxQueueSize(maxQueueSize int) BatchingProducerConfigOption
- func WithBatchingProducerOnPublishingError(onPublishingError PublishingErrorHandler) BatchingProducerConfigOption
- func WithBatchingProducerPublishTimeout(publishTimeout time.Duration) BatchingProducerConfigOption
- func WithBatchingProducerPublisherIdleTimeout(publisherIdleTimeout time.Duration) BatchingProducerConfigOption
- type Cluster
- func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigOption) *BatchingProducer
- func (c *Cluster) ExtensionID() extensions.ExtensionID
- func (c *Cluster) Get(identity string, kind string) *actor.PID
- func (c *Cluster) GetBlockedMembers() set.Set[string]
- func (c *Cluster) GetClusterKind(kind string) *ActivatedKind
- func (c *Cluster) GetClusterKinds() []string
- func (c *Cluster) Logger() *slog.Logger
- func (c *Cluster) Publisher() Publisher
- func (c *Cluster) Request(identity string, kind string, message interface{}, option ...GrainCallOption) (interface{}, error)
- func (c *Cluster) Shutdown(graceful bool)
- func (c *Cluster) StartClient()
- func (c *Cluster) StartMember()
- func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error)
- func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error)
- func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, opts ...GrainCallOption) (*SubscribeResponse, error)
- func (c *Cluster) TryGetClusterKind(kind string) (*ActivatedKind, bool)
- func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error)
- func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error)
- func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error)
- type ClusterContextConfig
- type ClusterIdentity
- func (ci *ClusterIdentity) AsKey() string
- func (*ClusterIdentity) Descriptor() ([]byte, []int)deprecated
- func (ci *ClusterIdentity) ExtensionID() ctxext.ContextExtensionID
- func (x *ClusterIdentity) GetIdentity() string
- func (x *ClusterIdentity) GetKind() string
- func (*ClusterIdentity) ProtoMessage()
- func (x *ClusterIdentity) ProtoReflect() protoreflect.Message
- func (x *ClusterIdentity) Reset()
- func (x *ClusterIdentity) String() string
- func (ci *ClusterIdentity) ToShortString() string
- type ClusterInit
- type ClusterProvider
- type ClusterTopology
- func (*ClusterTopology) Descriptor() ([]byte, []int)deprecated
- func (x *ClusterTopology) GetBlocked() []string
- func (x *ClusterTopology) GetJoined() []*Member
- func (x *ClusterTopology) GetLeft() []*Member
- func (x *ClusterTopology) GetMembers() []*Member
- func (x *ClusterTopology) GetTopologyHash() uint64
- func (*ClusterTopology) ProtoMessage()
- func (x *ClusterTopology) ProtoReflect() protoreflect.Message
- func (x *ClusterTopology) Reset()
- func (x *ClusterTopology) String() string
- type ClusterTopologyNotification
- func (*ClusterTopologyNotification) Descriptor() ([]byte, []int)deprecated
- func (x *ClusterTopologyNotification) GetLeaderId() string
- func (x *ClusterTopologyNotification) GetMemberId() string
- func (x *ClusterTopologyNotification) GetTopologyHash() uint32
- func (*ClusterTopologyNotification) ProtoMessage()
- func (x *ClusterTopologyNotification) ProtoReflect() protoreflect.Message
- func (x *ClusterTopologyNotification) Reset()
- func (x *ClusterTopologyNotification) String() string
- type Config
- type ConfigOption
- func WithClusterContextProducer(producer ContextProducer) ConfigOption
- func WithHeartbeatExpiration(t time.Duration) ConfigOption
- func WithKinds(kinds ...*Kind) ConfigOption
- func WithMaxNumberOfEventsInRequestLogThrottlePeriod(maxNumber int) ConfigOption
- func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption
- func WithRequestTimeout(t time.Duration) ConfigOption
- func WithRequestsLogThrottlePeriod(period time.Duration) ConfigOption
- type ConsensusCheck
- type ConsensusCheckBuilder
- func (ccb *ConsensusCheckBuilder) AffectedKeys() []string
- func (ccb *ConsensusCheckBuilder) Build() (ConsensusHandler, *ConsensusCheck)
- func (ccb *ConsensusCheckBuilder) Check() ConsensusChecker
- func (ccb *ConsensusCheckBuilder) HasConsensus(memberValues []*consensusMemberValue) (bool, uint64)
- func (ccb *ConsensusCheckBuilder) MapToValue(valueTuple *consensusValue) func(string, *GossipMemberState) (string, string, uint64)
- type ConsensusCheckDefinition
- type ConsensusChecker
- type ConsensusChecks
- type ConsensusHandler
- type Context
- type ContextProducer
- type DefaultContext
- type DeliverBatchRequest
- type DeliverBatchRequestTransport
- func (*DeliverBatchRequestTransport) Descriptor() ([]byte, []int)deprecated
- func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable
- func (x *DeliverBatchRequestTransport) GetBatch() *PubSubBatchTransport
- func (x *DeliverBatchRequestTransport) GetSubscribers() *Subscribers
- func (x *DeliverBatchRequestTransport) GetTopic() string
- func (*DeliverBatchRequestTransport) ProtoMessage()
- func (x *DeliverBatchRequestTransport) ProtoReflect() protoreflect.Message
- func (x *DeliverBatchRequestTransport) Reset()
- func (x *DeliverBatchRequestTransport) String() string
- type DeliveryStatus
- func (DeliveryStatus) Descriptor() protoreflect.EnumDescriptor
- func (x DeliveryStatus) Enum() *DeliveryStatus
- func (DeliveryStatus) EnumDescriptor() ([]byte, []int)deprecated
- func (x DeliveryStatus) Number() protoreflect.EnumNumber
- func (x DeliveryStatus) String() string
- func (DeliveryStatus) Type() protoreflect.EnumType
- type EmptyKeyValueStore
- type GetGossipStateRequest
- type GetGossipStateResponse
- type GetPid
- type Gossip
- type GossipActor
- type GossipConsensusChecker
- type GossipCore
- type GossipDeltaValue
- func (*GossipDeltaValue) Descriptor() ([]byte, []int)deprecated
- func (x *GossipDeltaValue) GetEntries() []*GossipDeltaValue_GossipDeltaEntry
- func (*GossipDeltaValue) ProtoMessage()
- func (x *GossipDeltaValue) ProtoReflect() protoreflect.Message
- func (x *GossipDeltaValue) Reset()
- func (x *GossipDeltaValue) String() string
- type GossipDeltaValue_GossipDeltaEntry
- func (*GossipDeltaValue_GossipDeltaEntry) Descriptor() ([]byte, []int)deprecated
- func (x *GossipDeltaValue_GossipDeltaEntry) GetData() []byte
- func (x *GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber() int64
- func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage()
- func (x *GossipDeltaValue_GossipDeltaEntry) ProtoReflect() protoreflect.Message
- func (x *GossipDeltaValue_GossipDeltaEntry) Reset()
- func (x *GossipDeltaValue_GossipDeltaEntry) String() string
- type GossipKeyValue
- func (*GossipKeyValue) Descriptor() ([]byte, []int)deprecated
- func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64
- func (x *GossipKeyValue) GetSequenceNumber() int64
- func (x *GossipKeyValue) GetValue() *anypb.Any
- func (*GossipKeyValue) ProtoMessage()
- func (x *GossipKeyValue) ProtoReflect() protoreflect.Message
- func (x *GossipKeyValue) Reset()
- func (x *GossipKeyValue) String() string
- type GossipKeyValues
- type GossipMemberState
- type GossipMemberStates
- type GossipRequest
- func (*GossipRequest) Descriptor() ([]byte, []int)deprecated
- func (x *GossipRequest) GetMemberId() string
- func (x *GossipRequest) GetState() *GossipState
- func (*GossipRequest) ProtoMessage()
- func (x *GossipRequest) ProtoReflect() protoreflect.Message
- func (x *GossipRequest) Reset()
- func (x *GossipRequest) String() string
- type GossipResponse
- type GossipState
- type GossipStateStorer
- type GossipState_GossipMemberState
- func (*GossipState_GossipMemberState) Descriptor() ([]byte, []int)deprecated
- func (x *GossipState_GossipMemberState) GetValues() map[string]*GossipKeyValue
- func (*GossipState_GossipMemberState) ProtoMessage()
- func (x *GossipState_GossipMemberState) ProtoReflect() protoreflect.Message
- func (x *GossipState_GossipMemberState) Reset()
- func (x *GossipState_GossipMemberState) String() string
- type GossipUpdate
- type GossipUpdater
- type Gossiper
- func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error)
- func (g *Gossiper) RegisterConsensusCheck(key string, getValue func(*anypb.Any) interface{}) ConsensusHandler
- func (g *Gossiper) SendState()
- func (g *Gossiper) SetState(key string, value proto.Message)
- func (g *Gossiper) SetStateRequest(key string, value proto.Message) error
- func (g *Gossiper) Shutdown()
- func (g *Gossiper) StartGossiping() error
- type GrainCallConfig
- type GrainCallOption
- type GrainContext
- type GrainErrorResponse
- type GrainRequest
- func (*GrainRequest) Descriptor() ([]byte, []int)deprecated
- func (x *GrainRequest) GetMessageData() []byte
- func (x *GrainRequest) GetMessageTypeName() string
- func (x *GrainRequest) GetMethodIndex() int32
- func (*GrainRequest) ProtoMessage()
- func (x *GrainRequest) ProtoReflect() protoreflect.Message
- func (x *GrainRequest) Reset()
- func (x *GrainRequest) String() string
- type GrainResponse
- func (*GrainResponse) Descriptor() ([]byte, []int)deprecated
- func (x *GrainResponse) GetMessageData() []byte
- func (x *GrainResponse) GetMessageTypeName() string
- func (*GrainResponse) ProtoMessage()
- func (x *GrainResponse) ProtoReflect() protoreflect.Message
- func (x *GrainResponse) Reset()
- func (x *GrainResponse) String() string
- type IdentityHandover
- func (*IdentityHandover) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandover) GetActors() []*Activation
- func (x *IdentityHandover) GetChunkId() int32
- func (x *IdentityHandover) GetFinal() bool
- func (x *IdentityHandover) GetSent() int32
- func (x *IdentityHandover) GetSkipped() int32
- func (x *IdentityHandover) GetTopologyHash() uint64
- func (*IdentityHandover) ProtoMessage()
- func (x *IdentityHandover) ProtoReflect() protoreflect.Message
- func (x *IdentityHandover) Reset()
- func (x *IdentityHandover) String() string
- type IdentityHandoverAck
- func (*IdentityHandoverAck) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandoverAck) GetChunkId() int32
- func (x *IdentityHandoverAck) GetProcessingState() IdentityHandoverAck_State
- func (x *IdentityHandoverAck) GetTopologyHash() uint64
- func (*IdentityHandoverAck) ProtoMessage()
- func (x *IdentityHandoverAck) ProtoReflect() protoreflect.Message
- func (x *IdentityHandoverAck) Reset()
- func (x *IdentityHandoverAck) String() string
- type IdentityHandoverAck_State
- func (IdentityHandoverAck_State) Descriptor() protoreflect.EnumDescriptor
- func (x IdentityHandoverAck_State) Enum() *IdentityHandoverAck_State
- func (IdentityHandoverAck_State) EnumDescriptor() ([]byte, []int)deprecated
- func (x IdentityHandoverAck_State) Number() protoreflect.EnumNumber
- func (x IdentityHandoverAck_State) String() string
- func (IdentityHandoverAck_State) Type() protoreflect.EnumType
- type IdentityHandoverRequest
- func (*IdentityHandoverRequest) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandoverRequest) GetAddress() string
- func (x *IdentityHandoverRequest) GetCurrentTopology() *IdentityHandoverRequest_Topology
- func (x *IdentityHandoverRequest) GetDeltaTopology() *IdentityHandoverRequest_Topology
- func (*IdentityHandoverRequest) ProtoMessage()
- func (x *IdentityHandoverRequest) ProtoReflect() protoreflect.Message
- func (x *IdentityHandoverRequest) Reset()
- func (x *IdentityHandoverRequest) String() string
- type IdentityHandoverRequest_Topology
- func (*IdentityHandoverRequest_Topology) Descriptor() ([]byte, []int)deprecated
- func (x *IdentityHandoverRequest_Topology) GetMembers() []*Member
- func (x *IdentityHandoverRequest_Topology) GetTopologyHash() uint64
- func (*IdentityHandoverRequest_Topology) ProtoMessage()
- func (x *IdentityHandoverRequest_Topology) ProtoReflect() protoreflect.Message
- func (x *IdentityHandoverRequest_Topology) Reset()
- func (x *IdentityHandoverRequest_Topology) String() string
- type IdentityLookup
- type IdentityStorageLookup
- type IdentityStorageWorker
- type Informer
- func (inf *Informer) AddConsensusCheck(id string, check *ConsensusCheck)
- func (inf *Informer) CheckConsensus(updatedKeys ...string)
- func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelta
- func (inf *Informer) GetState(key string) map[string]*GossipKeyValue
- func (inf *Informer) ReceiveState(remoteState *GossipState) []*GossipUpdate
- func (inf *Informer) RemoveConsensusCheck(id string)
- func (inf *Informer) SendState(sendStateToMember LocalStateSender)
- func (inf *Informer) SetState(key string, message proto.Message)
- func (inf *Informer) UpdateClusterTopology(topology *ClusterTopology)
- type Initialize
- type InvalidOperationException
- type KeyValueStore
- type Kind
- type LocalStateSender
- type Member
- func (m *Member) Address() string
- func (*Member) Descriptor() ([]byte, []int)deprecated
- func (x *Member) GetHost() string
- func (x *Member) GetId() string
- func (x *Member) GetKinds() []string
- func (x *Member) GetPort() int32
- func (m *Member) HasKind(kind string) bool
- func (*Member) ProtoMessage()
- func (x *Member) ProtoReflect() protoreflect.Message
- func (x *Member) Reset()
- func (x *Member) String() string
- type MemberAvailableEvent
- type MemberHeartbeat
- func (*MemberHeartbeat) Descriptor() ([]byte, []int)deprecated
- func (x *MemberHeartbeat) GetActorStatistics() *ActorStatistics
- func (*MemberHeartbeat) ProtoMessage()
- func (x *MemberHeartbeat) ProtoReflect() protoreflect.Message
- func (x *MemberHeartbeat) Reset()
- func (x *MemberHeartbeat) String() string
- type MemberJoinedEvent
- type MemberLeftEvent
- type MemberList
- func (ml *MemberList) BroadcastEvent(message interface{}, includeSelf bool)
- func (ml *MemberList) ContainsMemberID(memberID string) bool
- func (ml *MemberList) GetActivatorMember(kind string, requestSourceAddress string) string
- func (ml *MemberList) InitializeTopologyConsensus()
- func (ml *MemberList) Length() int
- func (ml *MemberList) Members() *MemberSet
- func (ml *MemberList) TerminateMember(m *Member)
- func (ml *MemberList) TopologyConsensus(ctx context.Context) (uint64, bool)
- func (ml *MemberList) UpdateClusterTopology(members Members)
- type MemberMeta
- type MemberRejoinedEvent
- type MemberSet
- func (ms *MemberSet) ContainsID(id string) bool
- func (ms *MemberSet) Equals(other *MemberSet) bool
- func (ms *MemberSet) Except(other *MemberSet) *MemberSet
- func (ms *MemberSet) ExceptIds(ids []string) *MemberSet
- func (ms *MemberSet) GetMemberById(id string) *Member
- func (ms *MemberSet) Len() int
- func (ms *MemberSet) Members() Members
- func (ms *MemberSet) TopologyHash() uint64
- func (ms *MemberSet) Union(other *MemberSet) *MemberSet
- type MemberStateDelta
- type MemberStatus
- type MemberStatusEvent
- type MemberStrategy
- type MemberUnavailableEvent
- type Members
- type NotifyAboutFailingSubscribersRequest
- func (*NotifyAboutFailingSubscribersRequest) Descriptor() ([]byte, []int)deprecated
- func (x *NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries() []*SubscriberDeliveryReport
- func (*NotifyAboutFailingSubscribersRequest) ProtoMessage()
- func (x *NotifyAboutFailingSubscribersRequest) ProtoReflect() protoreflect.Message
- func (x *NotifyAboutFailingSubscribersRequest) Reset()
- func (x *NotifyAboutFailingSubscribersRequest) String() string
- type NotifyAboutFailingSubscribersResponse
- func (*NotifyAboutFailingSubscribersResponse) Descriptor() ([]byte, []int)deprecated
- func (*NotifyAboutFailingSubscribersResponse) ProtoMessage()
- func (x *NotifyAboutFailingSubscribersResponse) ProtoReflect() protoreflect.Message
- func (x *NotifyAboutFailingSubscribersResponse) Reset()
- func (x *NotifyAboutFailingSubscribersResponse) String() string
- type Option
- type PackedActivations
- func (*PackedActivations) Descriptor() ([]byte, []int)deprecated
- func (x *PackedActivations) GetActors() []*PackedActivations_Kind
- func (x *PackedActivations) GetAddress() string
- func (*PackedActivations) ProtoMessage()
- func (x *PackedActivations) ProtoReflect() protoreflect.Message
- func (x *PackedActivations) Reset()
- func (x *PackedActivations) String() string
- type PackedActivations_Activation
- func (*PackedActivations_Activation) Descriptor() ([]byte, []int)deprecated
- func (x *PackedActivations_Activation) GetActivationId() string
- func (x *PackedActivations_Activation) GetIdentity() string
- func (*PackedActivations_Activation) ProtoMessage()
- func (x *PackedActivations_Activation) ProtoReflect() protoreflect.Message
- func (x *PackedActivations_Activation) Reset()
- func (x *PackedActivations_Activation) String() string
- type PackedActivations_Kind
- func (*PackedActivations_Kind) Descriptor() ([]byte, []int)deprecated
- func (x *PackedActivations_Kind) GetActivations() []*PackedActivations_Activation
- func (x *PackedActivations_Kind) GetName() string
- func (*PackedActivations_Kind) ProtoMessage()
- func (x *PackedActivations_Kind) ProtoReflect() protoreflect.Message
- func (x *PackedActivations_Kind) Reset()
- func (x *PackedActivations_Kind) String() string
- type PidCacheValue
- func (c *PidCacheValue) Get(identity string, kind string) (*actor.PID, bool)
- func (c *PidCacheValue) Remove(identity string, kind string)
- func (c *PidCacheValue) RemoveByMember(member *Member)
- func (c *PidCacheValue) RemoveByValue(identity string, kind string, pid *actor.PID)
- func (c *PidCacheValue) Set(identity string, kind string, pid *actor.PID)
- type PidResult
- type ProduceProcessInfo
- type ProducerQueueFullException
- type ProxyActivationRequest
- func (*ProxyActivationRequest) Descriptor() ([]byte, []int)deprecated
- func (x *ProxyActivationRequest) GetClusterIdentity() *ClusterIdentity
- func (x *ProxyActivationRequest) GetReplacedActivation() *actor.PID
- func (*ProxyActivationRequest) ProtoMessage()
- func (x *ProxyActivationRequest) ProtoReflect() protoreflect.Message
- func (x *ProxyActivationRequest) Reset()
- func (x *ProxyActivationRequest) String() string
- type PubSub
- type PubSubAutoRespondBatch
- type PubSubAutoRespondBatchTransport
- func (*PubSubAutoRespondBatchTransport) Descriptor() ([]byte, []int)deprecated
- func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable
- func (x *PubSubAutoRespondBatchTransport) GetEnvelopes() []*PubSubEnvelope
- func (x *PubSubAutoRespondBatchTransport) GetTypeNames() []string
- func (*PubSubAutoRespondBatchTransport) ProtoMessage()
- func (x *PubSubAutoRespondBatchTransport) ProtoReflect() protoreflect.Message
- func (x *PubSubAutoRespondBatchTransport) Reset()
- func (x *PubSubAutoRespondBatchTransport) String() string
- type PubSubBatch
- type PubSubBatchTransport
- func (*PubSubBatchTransport) Descriptor() ([]byte, []int)deprecated
- func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable
- func (x *PubSubBatchTransport) GetEnvelopes() []*PubSubEnvelope
- func (x *PubSubBatchTransport) GetTypeNames() []string
- func (*PubSubBatchTransport) ProtoMessage()
- func (x *PubSubBatchTransport) ProtoReflect() protoreflect.Message
- func (x *PubSubBatchTransport) Reset()
- func (x *PubSubBatchTransport) String() string
- type PubSubConfig
- type PubSubEnvelope
- func (*PubSubEnvelope) Descriptor() ([]byte, []int)deprecated
- func (x *PubSubEnvelope) GetMessageData() []byte
- func (x *PubSubEnvelope) GetSerializerId() int32
- func (x *PubSubEnvelope) GetTypeId() int32
- func (*PubSubEnvelope) ProtoMessage()
- func (x *PubSubEnvelope) ProtoReflect() protoreflect.Message
- func (x *PubSubEnvelope) Reset()
- func (x *PubSubEnvelope) String() string
- type PubSubMemberDeliveryActor
- func (p *PubSubMemberDeliveryActor) DeliverBatch(c actor.Context, batch *PubSubAutoRespondBatch, s *SubscriberIdentity) *actor.Future
- func (p *PubSubMemberDeliveryActor) DeliverToClusterIdentity(c actor.Context, batch *PubSubAutoRespondBatch, ci *ClusterIdentity) *actor.Future
- func (p *PubSubMemberDeliveryActor) DeliverToPid(c actor.Context, batch *PubSubAutoRespondBatch, pid *actor.PID) *actor.Future
- func (p *PubSubMemberDeliveryActor) Receive(c actor.Context)
- type PublishResponse
- type PublishStatus
- func (PublishStatus) Descriptor() protoreflect.EnumDescriptor
- func (x PublishStatus) Enum() *PublishStatus
- func (PublishStatus) EnumDescriptor() ([]byte, []int)deprecated
- func (x PublishStatus) Number() protoreflect.EnumNumber
- func (x PublishStatus) String() string
- func (PublishStatus) Type() protoreflect.EnumType
- type Publisher
- type PublisherConfig
- type PublishingErrorDecision
- type PublishingErrorHandler
- type ReadyForRebalance
- func (*ReadyForRebalance) Descriptor() ([]byte, []int)deprecated
- func (x *ReadyForRebalance) GetTopologyHash() uint64
- func (*ReadyForRebalance) ProtoMessage()
- func (x *ReadyForRebalance) ProtoReflect() protoreflect.Message
- func (x *ReadyForRebalance) Reset()
- func (x *ReadyForRebalance) String() string
- type RebalanceCompleted
- func (*RebalanceCompleted) Descriptor() ([]byte, []int)deprecated
- func (x *RebalanceCompleted) GetTopologyHash() uint64
- func (*RebalanceCompleted) ProtoMessage()
- func (x *RebalanceCompleted) ProtoReflect() protoreflect.Message
- func (x *RebalanceCompleted) Reset()
- func (x *RebalanceCompleted) String() string
- type RemoteIdentityHandover
- func (*RemoteIdentityHandover) Descriptor() ([]byte, []int)deprecated
- func (x *RemoteIdentityHandover) GetActors() *PackedActivations
- func (x *RemoteIdentityHandover) GetChunkId() int32
- func (x *RemoteIdentityHandover) GetFinal() bool
- func (x *RemoteIdentityHandover) GetSent() int32
- func (x *RemoteIdentityHandover) GetSkipped() int32
- func (x *RemoteIdentityHandover) GetTopologyHash() uint64
- func (*RemoteIdentityHandover) ProtoMessage()
- func (x *RemoteIdentityHandover) ProtoReflect() protoreflect.Message
- func (x *RemoteIdentityHandover) Reset()
- func (x *RemoteIdentityHandover) String() string
- type RemoveConsensusCheck
- type Rendezvous
- type SendGossipStateRequest
- type SendGossipStateResponse
- type SetGossipStateKey
- type SetGossipStateResponse
- type SimpleRoundRobin
- type SpawnLock
- type StorageLookup
- type StoredActivation
- type SubscribeRequest
- func (*SubscribeRequest) Descriptor() ([]byte, []int)deprecated
- func (x *SubscribeRequest) GetSubscriber() *SubscriberIdentity
- func (*SubscribeRequest) ProtoMessage()
- func (x *SubscribeRequest) ProtoReflect() protoreflect.Message
- func (x *SubscribeRequest) Reset()
- func (x *SubscribeRequest) String() string
- type SubscribeResponse
- type SubscriberDeliveryReport
- func (*SubscriberDeliveryReport) Descriptor() ([]byte, []int)deprecated
- func (x *SubscriberDeliveryReport) GetStatus() DeliveryStatus
- func (x *SubscriberDeliveryReport) GetSubscriber() *SubscriberIdentity
- func (*SubscriberDeliveryReport) ProtoMessage()
- func (x *SubscriberDeliveryReport) ProtoReflect() protoreflect.Message
- func (x *SubscriberDeliveryReport) Reset()
- func (x *SubscriberDeliveryReport) String() string
- type SubscriberIdentity
- func (*SubscriberIdentity) Descriptor() ([]byte, []int)deprecated
- func (x *SubscriberIdentity) GetClusterIdentity() *ClusterIdentity
- func (m *SubscriberIdentity) GetIdentity() isSubscriberIdentity_Identity
- func (x *SubscriberIdentity) GetPid() *actor.PID
- func (*SubscriberIdentity) ProtoMessage()
- func (x *SubscriberIdentity) ProtoReflect() protoreflect.Message
- func (x *SubscriberIdentity) Reset()
- func (x *SubscriberIdentity) String() string
- type SubscriberIdentity_ClusterIdentity
- type SubscriberIdentity_Pid
- type Subscribers
- type TestMessage
- type TopicActor
- type UnsubscribeRequest
- func (*UnsubscribeRequest) Descriptor() ([]byte, []int)deprecated
- func (x *UnsubscribeRequest) GetSubscriber() *SubscriberIdentity
- func (*UnsubscribeRequest) ProtoMessage()
- func (x *UnsubscribeRequest) ProtoReflect() protoreflect.Message
- func (x *UnsubscribeRequest) Reset()
- func (x *UnsubscribeRequest) String() string
- type UnsubscribeResponse
Constants ¶
const ( TopologyKey string = "topology" HearthbeatKey string = "heathbeat" GracefullyLeftKey string = "left" )
const DefaultGossipActorName string = "gossip"
const PubSubDeliveryName = "$pubsub-delivery"
const TopicActorKind = "prototopic"
Variables ¶
var ( IdentityHandoverAck_State_name = map[int32]string{ 0: "processed", 1: "incorrect_topology", } IdentityHandoverAck_State_value = map[string]int32{ "processed": 0, "incorrect_topology": 1, } )
Enum value maps for IdentityHandoverAck_State.
var ( DeliveryStatus_name = map[int32]string{ 0: "Delivered", 1: "SubscriberNoLongerReachable", 2: "Timeout", 127: "OtherError", } DeliveryStatus_value = map[string]int32{ "Delivered": 0, "SubscriberNoLongerReachable": 1, "Timeout": 2, "OtherError": 127, } )
Enum value maps for DeliveryStatus.
var ( PublishStatus_name = map[int32]string{ 0: "Ok", 1: "Failed", } PublishStatus_value = map[string]int32{ "Ok": 0, "Failed": 1, } )
Enum value maps for PublishStatus.
var FailBatchAndContinue = NewPublishingErrorDecision(0)
FailBatchAndContinue skips the current batch and proceeds to the next one. The delivery reports (tasks) related to that batch are still failed with the exception that triggered the error handling.
var FailBatchAndStop = NewPublishingErrorDecision(0)
FailBatchAndStop causes the BatchingProducer to stop and fail the pending messages
var File_cluster_proto protoreflect.FileDescriptor
var File_gossip_proto protoreflect.FileDescriptor
var File_grain_proto protoreflect.FileDescriptor
var File_pubsub_proto protoreflect.FileDescriptor
var File_pubsub_test_proto protoreflect.FileDescriptor
var RetryBatchImmediately = NewPublishingErrorDecision(0)
RetryBatchImmediately retries the current batch immediately
Functions ¶
func MembersToMap ¶
func NewGossipConsensusHandler ¶
func NewGossipConsensusHandler() *gossipConsensusHandler
func RemotePlacementActor ¶
RemotePlacementActor returns the PID of the remote placement actor
func SetClusterIdentity ¶
func SetClusterIdentity(ctx actor.ExtensionContext, ci *ClusterIdentity)
func SortMembers ¶
func SortMembers(members Members)
func TopologyHash ¶
func WithClusterIdentity ¶
func WithClusterIdentity(props *actor.Props, ci *ClusterIdentity) *actor.Props
Types ¶
type Acknowledge ¶ added in v0.3.0
type Acknowledge struct {
// contains filtered or unexported fields
}
func (*Acknowledge) Descriptor
deprecated
added in
v0.3.0
func (*Acknowledge) Descriptor() ([]byte, []int)
Deprecated: Use Acknowledge.ProtoReflect.Descriptor instead.
func (*Acknowledge) ProtoMessage ¶ added in v0.3.0
func (*Acknowledge) ProtoMessage()
func (*Acknowledge) ProtoReflect ¶ added in v0.3.0
func (x *Acknowledge) ProtoReflect() protoreflect.Message
func (*Acknowledge) Reset ¶ added in v0.3.0
func (x *Acknowledge) Reset()
func (*Acknowledge) String ¶ added in v0.3.0
func (x *Acknowledge) String() string
type ActivatedKind ¶
type ActivatedKind struct { Kind string Props *actor.Props Strategy MemberStrategy // contains filtered or unexported fields }
func (*ActivatedKind) Dev ¶
func (ak *ActivatedKind) Dev()
func (*ActivatedKind) Inc ¶
func (ak *ActivatedKind) Inc()
type Activation ¶
type Activation struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` // contains filtered or unexported fields }
func (*Activation) Descriptor
deprecated
func (*Activation) Descriptor() ([]byte, []int)
Deprecated: Use Activation.ProtoReflect.Descriptor instead.
func (*Activation) GetClusterIdentity ¶
func (x *Activation) GetClusterIdentity() *ClusterIdentity
func (*Activation) GetPid ¶
func (x *Activation) GetPid() *actor.PID
func (*Activation) ProtoMessage ¶
func (*Activation) ProtoMessage()
func (*Activation) ProtoReflect ¶
func (x *Activation) ProtoReflect() protoreflect.Message
func (*Activation) Reset ¶
func (x *Activation) Reset()
func (*Activation) String ¶
func (x *Activation) String() string
type ActivationRequest ¶
type ActivationRequest struct { ClusterIdentity *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` RequestId string `protobuf:"bytes,2,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` TopologyHash uint64 `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*ActivationRequest) Descriptor
deprecated
func (*ActivationRequest) Descriptor() ([]byte, []int)
Deprecated: Use ActivationRequest.ProtoReflect.Descriptor instead.
func (*ActivationRequest) GetClusterIdentity ¶
func (x *ActivationRequest) GetClusterIdentity() *ClusterIdentity
func (*ActivationRequest) GetRequestId ¶
func (x *ActivationRequest) GetRequestId() string
func (*ActivationRequest) GetTopologyHash ¶
func (x *ActivationRequest) GetTopologyHash() uint64
func (*ActivationRequest) ProtoMessage ¶
func (*ActivationRequest) ProtoMessage()
func (*ActivationRequest) ProtoReflect ¶
func (x *ActivationRequest) ProtoReflect() protoreflect.Message
func (*ActivationRequest) Reset ¶
func (x *ActivationRequest) Reset()
func (*ActivationRequest) String ¶
func (x *ActivationRequest) String() string
type ActivationResponse ¶
type ActivationResponse struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` Failed bool `protobuf:"varint,2,opt,name=failed,proto3" json:"failed,omitempty"` TopologyHash uint64 `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*ActivationResponse) Descriptor
deprecated
func (*ActivationResponse) Descriptor() ([]byte, []int)
Deprecated: Use ActivationResponse.ProtoReflect.Descriptor instead.
func (*ActivationResponse) GetFailed ¶
func (x *ActivationResponse) GetFailed() bool
func (*ActivationResponse) GetPid ¶
func (x *ActivationResponse) GetPid() *actor.PID
func (*ActivationResponse) GetTopologyHash ¶
func (x *ActivationResponse) GetTopologyHash() uint64
func (*ActivationResponse) ProtoMessage ¶
func (*ActivationResponse) ProtoMessage()
func (*ActivationResponse) ProtoReflect ¶
func (x *ActivationResponse) ProtoReflect() protoreflect.Message
func (*ActivationResponse) Reset ¶
func (x *ActivationResponse) Reset()
func (*ActivationResponse) String ¶
func (x *ActivationResponse) String() string
type ActivationTerminated ¶
type ActivationTerminated struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` // contains filtered or unexported fields }
Terminated, removed from lookup
func (*ActivationTerminated) Descriptor
deprecated
func (*ActivationTerminated) Descriptor() ([]byte, []int)
Deprecated: Use ActivationTerminated.ProtoReflect.Descriptor instead.
func (*ActivationTerminated) GetClusterIdentity ¶
func (x *ActivationTerminated) GetClusterIdentity() *ClusterIdentity
func (*ActivationTerminated) GetPid ¶
func (x *ActivationTerminated) GetPid() *actor.PID
func (*ActivationTerminated) ProtoMessage ¶
func (*ActivationTerminated) ProtoMessage()
func (*ActivationTerminated) ProtoReflect ¶
func (x *ActivationTerminated) ProtoReflect() protoreflect.Message
func (*ActivationTerminated) Reset ¶
func (x *ActivationTerminated) Reset()
func (*ActivationTerminated) String ¶
func (x *ActivationTerminated) String() string
type ActivationTerminating ¶
type ActivationTerminating struct { Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"` ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` // contains filtered or unexported fields }
Started terminating, not yet removed from IIdentityLookup
func (*ActivationTerminating) Descriptor
deprecated
func (*ActivationTerminating) Descriptor() ([]byte, []int)
Deprecated: Use ActivationTerminating.ProtoReflect.Descriptor instead.
func (*ActivationTerminating) GetClusterIdentity ¶
func (x *ActivationTerminating) GetClusterIdentity() *ClusterIdentity
func (*ActivationTerminating) GetPid ¶
func (x *ActivationTerminating) GetPid() *actor.PID
func (*ActivationTerminating) ProtoMessage ¶
func (*ActivationTerminating) ProtoMessage()
func (*ActivationTerminating) ProtoReflect ¶
func (x *ActivationTerminating) ProtoReflect() protoreflect.Message
func (*ActivationTerminating) Reset ¶
func (x *ActivationTerminating) Reset()
func (*ActivationTerminating) String ¶
func (x *ActivationTerminating) String() string
type ActorStatistics ¶
type ActorStatistics struct { ActorCount map[string]int64 `` /* 180-byte string literal not displayed */ // contains filtered or unexported fields }
func (*ActorStatistics) Descriptor
deprecated
func (*ActorStatistics) Descriptor() ([]byte, []int)
Deprecated: Use ActorStatistics.ProtoReflect.Descriptor instead.
func (*ActorStatistics) GetActorCount ¶
func (x *ActorStatistics) GetActorCount() map[string]int64
func (*ActorStatistics) ProtoMessage ¶
func (*ActorStatistics) ProtoMessage()
func (*ActorStatistics) ProtoReflect ¶
func (x *ActorStatistics) ProtoReflect() protoreflect.Message
func (*ActorStatistics) Reset ¶
func (x *ActorStatistics) Reset()
func (*ActorStatistics) String ¶
func (x *ActorStatistics) String() string
type AddConsensusCheck ¶
type AddConsensusCheck struct { ID string Check *ConsensusCheck }
func NewAddConsensusCheck ¶
func NewAddConsensusCheck(id string, check *ConsensusCheck) AddConsensusCheck
type BatchingProducer ¶ added in v0.3.0
type BatchingProducer struct {
// contains filtered or unexported fields
}
func NewBatchingProducer ¶ added in v0.3.0
func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchingProducerConfigOption) *BatchingProducer
func (*BatchingProducer) Dispose ¶ added in v0.3.0
func (p *BatchingProducer) Dispose()
Dispose stops the producer and releases all resources.
func (*BatchingProducer) Produce ¶ added in v0.3.0
func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*ProduceProcessInfo, error)
Produce a message to producer queue. The return info can be used to wait for the message to be published.
type BatchingProducerConfig ¶ added in v0.3.0
type BatchingProducerConfig struct { // Maximum size of the published batch. Default: 2000. BatchSize int // Max size of the requests waiting in queue. If value is provided, the producer will throw // ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded // Note that bounded queue has better performance than unbounded queue. // Default: 0 (unbounded) MaxQueueSize int // How long to wait for the publishing to complete. // Default: 5s PublishTimeout time.Duration // Error handler that can decide what to do with an error when publishing a batch. // Default: Fail and stop the BatchingProducer OnPublishingError PublishingErrorHandler // A throttle for logging from this producer. By default, a throttle shared between all instances of // BatchingProducer is used, that allows for 10 events in 1 second. LogThrottle actor.ShouldThrottle // Optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean // up code to recover resources. PublisherIdleTimeout time.Duration }
type BatchingProducerConfigOption ¶ added in v0.3.0
type BatchingProducerConfigOption func(config *BatchingProducerConfig)
func WithBatchingProducerBatchSize ¶ added in v0.3.0
func WithBatchingProducerBatchSize(batchSize int) BatchingProducerConfigOption
WithBatchingProducerBatchSize sets maximum size of the published batch. Default: 2000.
func WithBatchingProducerLogThrottle ¶ added in v0.3.0
func WithBatchingProducerLogThrottle(logThrottle actor.ShouldThrottle) BatchingProducerConfigOption
WithBatchingProducerLogThrottle sets a throttle for logging from this producer. By default, a throttle shared between all instances of BatchingProducer is used, that allows for 10 events in 10 seconds.
func WithBatchingProducerMaxQueueSize ¶ added in v0.3.0
func WithBatchingProducerMaxQueueSize(maxQueueSize int) BatchingProducerConfigOption
WithBatchingProducerMaxQueueSize set max size of the requests waiting in queue. If value is provided, the producer will throw ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded Note that bounded queue has better performance than unbounded queue. Default: 0 (unbounded)
func WithBatchingProducerOnPublishingError ¶ added in v0.3.0
func WithBatchingProducerOnPublishingError(onPublishingError PublishingErrorHandler) BatchingProducerConfigOption
WithBatchingProducerOnPublishingError sets error handler that can decide what to do with an error when publishing a batch. Default: Fail and stop the BatchingProducer
func WithBatchingProducerPublishTimeout ¶ added in v0.3.0
func WithBatchingProducerPublishTimeout(publishTimeout time.Duration) BatchingProducerConfigOption
WithBatchingProducerPublishTimeout sets how long to wait for the publishing to complete. Default: 5s
func WithBatchingProducerPublisherIdleTimeout ¶ added in v0.3.0
func WithBatchingProducerPublisherIdleTimeout(publisherIdleTimeout time.Duration) BatchingProducerConfigOption
WithBatchingProducerPublisherIdleTimeout sets an optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean up code to recover resources.
type Cluster ¶
type Cluster struct { ActorSystem *actor.ActorSystem Config *Config Gossip *Gossiper PubSub *PubSub Remote *remote.Remote PidCache *PidCacheValue MemberList *MemberList IdentityLookup IdentityLookup // contains filtered or unexported fields }
func GetCluster ¶
func GetCluster(actorSystem *actor.ActorSystem) *Cluster
func (*Cluster) BatchingProducer ¶ added in v0.3.0
func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigOption) *BatchingProducer
BatchingProducer create a new PubSub batching producer for specified topic, that publishes directly to the topic actor
func (*Cluster) ExtensionID ¶
func (c *Cluster) ExtensionID() extensions.ExtensionID
func (*Cluster) GetClusterKind ¶
func (c *Cluster) GetClusterKind(kind string) *ActivatedKind
func (*Cluster) GetClusterKinds ¶
func (*Cluster) Publisher ¶ added in v0.3.0
Publisher creates a new PubSub publisher that publishes messages directly to the TopicActor
func (*Cluster) Request ¶
func (c *Cluster) Request(identity string, kind string, message interface{}, option ...GrainCallOption) (interface{}, error)
func (*Cluster) StartClient ¶
func (c *Cluster) StartClient()
func (*Cluster) StartMember ¶
func (c *Cluster) StartMember()
func (*Cluster) SubscribeByClusterIdentity ¶ added in v0.3.0
func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error)
SubscribeByClusterIdentity subscribes to a PubSub topic by cluster identity
func (*Cluster) SubscribeByPid ¶ added in v0.3.0
func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error)
SubscribeByPid subscribes to a PubSub topic by subscriber PID
func (*Cluster) SubscribeWithReceive ¶ added in v0.3.0
func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, opts ...GrainCallOption) (*SubscribeResponse, error)
SubscribeWithReceive subscribe to a PubSub topic by providing a Receive function, that will be used to spawn a subscriber actor
func (*Cluster) TryGetClusterKind ¶
func (c *Cluster) TryGetClusterKind(kind string) (*ActivatedKind, bool)
func (*Cluster) UnsubscribeByClusterIdentity ¶ added in v0.3.0
func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error)
UnsubscribeByClusterIdentity unsubscribes from a PubSub topic by cluster identity
func (*Cluster) UnsubscribeByIdentityAndKind ¶ added in v0.3.0
func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error)
UnsubscribeByIdentityAndKind unsubscribes from a PubSub topic by cluster identity
func (*Cluster) UnsubscribeByPid ¶ added in v0.3.0
func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error)
UnsubscribeByPid unsubscribes from a PubSub topic by subscriber PID
type ClusterContextConfig ¶
type ClusterContextConfig struct { RequestsLogThrottlePeriod time.Duration MaxNumberOfEventsInRequestLogThrottledPeriod int // contains filtered or unexported fields }
ClusterContextConfig is used to configure cluster context parameters
type ClusterIdentity ¶
type ClusterIdentity struct { Identity string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"` Kind string `protobuf:"bytes,2,opt,name=kind,proto3" json:"kind,omitempty"` // contains filtered or unexported fields }
func GetClusterIdentity ¶
func GetClusterIdentity(ctx actor.ExtensionContext) *ClusterIdentity
func NewClusterIdentity ¶
func NewClusterIdentity(identity string, kind string) *ClusterIdentity
func (*ClusterIdentity) AsKey ¶
func (ci *ClusterIdentity) AsKey() string
func (*ClusterIdentity) Descriptor
deprecated
func (*ClusterIdentity) Descriptor() ([]byte, []int)
Deprecated: Use ClusterIdentity.ProtoReflect.Descriptor instead.
func (*ClusterIdentity) ExtensionID ¶
func (ci *ClusterIdentity) ExtensionID() ctxext.ContextExtensionID
func (*ClusterIdentity) GetIdentity ¶
func (x *ClusterIdentity) GetIdentity() string
func (*ClusterIdentity) GetKind ¶
func (x *ClusterIdentity) GetKind() string
func (*ClusterIdentity) ProtoMessage ¶
func (*ClusterIdentity) ProtoMessage()
func (*ClusterIdentity) ProtoReflect ¶
func (x *ClusterIdentity) ProtoReflect() protoreflect.Message
func (*ClusterIdentity) Reset ¶
func (x *ClusterIdentity) Reset()
func (*ClusterIdentity) String ¶
func (x *ClusterIdentity) String() string
type ClusterInit ¶
type ClusterInit struct { Identity *ClusterIdentity Cluster *Cluster }
type ClusterProvider ¶
type ClusterTopology ¶
type ClusterTopology struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Members []*Member `protobuf:"bytes,2,rep,name=members,proto3" json:"members,omitempty"` Joined []*Member `protobuf:"bytes,3,rep,name=joined,proto3" json:"joined,omitempty"` Left []*Member `protobuf:"bytes,4,rep,name=left,proto3" json:"left,omitempty"` Blocked []string `protobuf:"bytes,5,rep,name=blocked,proto3" json:"blocked,omitempty"` // contains filtered or unexported fields }
func (*ClusterTopology) Descriptor
deprecated
func (*ClusterTopology) Descriptor() ([]byte, []int)
Deprecated: Use ClusterTopology.ProtoReflect.Descriptor instead.
func (*ClusterTopology) GetBlocked ¶
func (x *ClusterTopology) GetBlocked() []string
func (*ClusterTopology) GetJoined ¶
func (x *ClusterTopology) GetJoined() []*Member
func (*ClusterTopology) GetLeft ¶
func (x *ClusterTopology) GetLeft() []*Member
func (*ClusterTopology) GetMembers ¶
func (x *ClusterTopology) GetMembers() []*Member
func (*ClusterTopology) GetTopologyHash ¶
func (x *ClusterTopology) GetTopologyHash() uint64
func (*ClusterTopology) ProtoMessage ¶
func (*ClusterTopology) ProtoMessage()
func (*ClusterTopology) ProtoReflect ¶
func (x *ClusterTopology) ProtoReflect() protoreflect.Message
func (*ClusterTopology) Reset ¶
func (x *ClusterTopology) Reset()
func (*ClusterTopology) String ¶
func (x *ClusterTopology) String() string
type ClusterTopologyNotification ¶
type ClusterTopologyNotification struct { MemberId string `protobuf:"bytes,1,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"` TopologyHash uint32 `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` LeaderId string `protobuf:"bytes,3,opt,name=leader_id,json=leaderId,proto3" json:"leader_id,omitempty"` // contains filtered or unexported fields }
func (*ClusterTopologyNotification) Descriptor
deprecated
func (*ClusterTopologyNotification) Descriptor() ([]byte, []int)
Deprecated: Use ClusterTopologyNotification.ProtoReflect.Descriptor instead.
func (*ClusterTopologyNotification) GetLeaderId ¶
func (x *ClusterTopologyNotification) GetLeaderId() string
func (*ClusterTopologyNotification) GetMemberId ¶
func (x *ClusterTopologyNotification) GetMemberId() string
func (*ClusterTopologyNotification) GetTopologyHash ¶
func (x *ClusterTopologyNotification) GetTopologyHash() uint32
func (*ClusterTopologyNotification) ProtoMessage ¶
func (*ClusterTopologyNotification) ProtoMessage()
func (*ClusterTopologyNotification) ProtoReflect ¶
func (x *ClusterTopologyNotification) ProtoReflect() protoreflect.Message
func (*ClusterTopologyNotification) Reset ¶
func (x *ClusterTopologyNotification) Reset()
func (*ClusterTopologyNotification) String ¶
func (x *ClusterTopologyNotification) String() string
type Config ¶
type Config struct { Name string Address string ClusterProvider ClusterProvider IdentityLookup IdentityLookup RemoteConfig *remote.Config RequestTimeoutTime time.Duration RequestsLogThrottlePeriod time.Duration MaxNumberOfEventsInRequestLogThrottledPeriod int ClusterContextProducer ContextProducer MemberStrategyBuilder func(cluster *Cluster, kind string) MemberStrategy Kinds map[string]*Kind TimeoutTime time.Duration GossipInterval time.Duration GossipRequestTimeout time.Duration GossipFanOut int GossipMaxSend int HeartbeatExpiration time.Duration // Gossip heartbeat timeout. If the member does not update its heartbeat within this period, it will be added to the BlockList PubSubConfig *PubSubConfig }
func Configure ¶
func Configure(clusterName string, clusterProvider ClusterProvider, identityLookup IdentityLookup, remoteConfig *remote.Config, options ...ConfigOption) *Config
func (*Config) ToClusterContextConfig ¶
func (c *Config) ToClusterContextConfig(logger *slog.Logger) *ClusterContextConfig
ToClusterContextConfig converts this cluster Config Context parameters into a valid ClusterContextConfig value and returns a pointer to its memory
type ConfigOption ¶
type ConfigOption func(config *Config)
func WithClusterContextProducer ¶
func WithClusterContextProducer(producer ContextProducer) ConfigOption
WithClusterContextProducer sets the cluster context producer.
func WithHeartbeatExpiration ¶ added in v0.3.0
func WithHeartbeatExpiration(t time.Duration) ConfigOption
WithHeartbeatExpiration sets the gossip heartbeat expiration.
func WithKinds ¶
func WithKinds(kinds ...*Kind) ConfigOption
func WithMaxNumberOfEventsInRequestLogThrottlePeriod ¶
func WithMaxNumberOfEventsInRequestLogThrottlePeriod(maxNumber int) ConfigOption
WithMaxNumberOfEventsInRequestLogThrottlePeriod sets the max number of events in request log throttled period.
func WithPubSubSubscriberTimeout ¶ added in v0.3.0
func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption
WithPubSubSubscriberTimeout sets a timeout used when delivering a message batch to a subscriber. Default is 5s.
func WithRequestTimeout ¶
func WithRequestTimeout(t time.Duration) ConfigOption
WithRequestTimeout sets the request timeout.
func WithRequestsLogThrottlePeriod ¶
func WithRequestsLogThrottlePeriod(period time.Duration) ConfigOption
WithRequestsLogThrottlePeriod sets the requests log throttle period.
type ConsensusCheck ¶
type ConsensusCheck struct {
// contains filtered or unexported fields
}
data structure helpful to store consensus check information and behavior.
func NewConsensusCheck ¶
func NewConsensusCheck(affectedKeys []string, check GossipUpdater) ConsensusCheck
creates a new ConsensusCheck value with the given data and return it back.
type ConsensusCheckBuilder ¶
type ConsensusCheckBuilder struct {
// contains filtered or unexported fields
}
func (*ConsensusCheckBuilder) AffectedKeys ¶
func (ccb *ConsensusCheckBuilder) AffectedKeys() []string
func (*ConsensusCheckBuilder) Build ¶
func (ccb *ConsensusCheckBuilder) Build() (ConsensusHandler, *ConsensusCheck)
Build builds a new ConsensusHandler and ConsensusCheck values and returns pointers to them
func (*ConsensusCheckBuilder) Check ¶
func (ccb *ConsensusCheckBuilder) Check() ConsensusChecker
func (*ConsensusCheckBuilder) HasConsensus ¶
func (ccb *ConsensusCheckBuilder) HasConsensus(memberValues []*consensusMemberValue) (bool, uint64)
func (*ConsensusCheckBuilder) MapToValue ¶
func (ccb *ConsensusCheckBuilder) MapToValue(valueTuple *consensusValue) func(string, *GossipMemberState) (string, string, uint64)
type ConsensusCheckDefinition ¶
type ConsensusCheckDefinition interface { Check() *ConsensusCheck AffectedKeys() map[string]struct{} }
type ConsensusChecker ¶
type ConsensusChecker func(*GossipState, map[string]empty) (bool, interface{})
ConsensusChecker Customary type used to provide consensus check callbacks of any type note: this is equivalent to (for future go v1.18):
type ConsensusChecker[T] func(GossipState, map[string]empty) (bool, T)
type ConsensusChecks ¶
type ConsensusChecks struct {
// contains filtered or unexported fields
}
acts as a storage of pointers to ConsensusCheck stored by key.
func NewConsensusChecks ¶
func NewConsensusChecks() *ConsensusChecks
creates a new ConsensusChecks value and returns a pointer to it.
func (*ConsensusChecks) Add ¶
func (cc *ConsensusChecks) Add(id string, check *ConsensusCheck)
adds a new pointer to a ConsensusCheck value in the storage and registers its affected by keys index.
func (*ConsensusChecks) GetByUpdatedKey ¶
func (cc *ConsensusChecks) GetByUpdatedKey(key string) []*ConsensusCheck
iterates over all the keys stored in the set (map[string]empty) found in the given key map and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given key.
func (*ConsensusChecks) GetByUpdatedKeys ¶
func (cc *ConsensusChecks) GetByUpdatedKeys(keys []string) []*ConsensusCheck
iterate over all the keys stored in the set (map[string]empty) found in the given key maps and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given keys with removed duplicates on it (as it is a "set").
func (*ConsensusChecks) Remove ¶
func (cc *ConsensusChecks) Remove(id string)
Remove removes the given ConsensusCheck identity from the storage and removes its affected by keys index if needed after cleaning.
type ConsensusHandler ¶
type Context ¶
type Context interface {
Request(identity string, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error)
}
Context is an interface any cluster context needs to implement
type ContextProducer ¶
Defines a type to provide DefaultContext configurations / implementations.
type DefaultContext ¶
type DefaultContext struct {
// contains filtered or unexported fields
}
Defines a default cluster context hashBytes structure.
func (*DefaultContext) Request ¶
func (dcc *DefaultContext) Request(identity, kind string, message interface{}, opts ...GrainCallOption) (interface{}, error)
type DeliverBatchRequest ¶ added in v0.3.0
type DeliverBatchRequest struct { Subscribers *Subscribers PubSubBatch *PubSubBatch Topic string }
func (*DeliverBatchRequest) Serialize ¶ added in v0.3.0
func (d *DeliverBatchRequest) Serialize() remote.RootSerialized
type DeliverBatchRequestTransport ¶ added in v0.3.0
type DeliverBatchRequestTransport struct { Subscribers *Subscribers `protobuf:"bytes,1,opt,name=subscribers,proto3" json:"subscribers,omitempty"` Batch *PubSubBatchTransport `protobuf:"bytes,2,opt,name=batch,proto3" json:"batch,omitempty"` Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` // contains filtered or unexported fields }
Message sent from topic to delivery actor
func (*DeliverBatchRequestTransport) Descriptor
deprecated
added in
v0.3.0
func (*DeliverBatchRequestTransport) Descriptor() ([]byte, []int)
Deprecated: Use DeliverBatchRequestTransport.ProtoReflect.Descriptor instead.
func (*DeliverBatchRequestTransport) Deserialize ¶ added in v0.3.0
func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable
func (*DeliverBatchRequestTransport) GetBatch ¶ added in v0.3.0
func (x *DeliverBatchRequestTransport) GetBatch() *PubSubBatchTransport
func (*DeliverBatchRequestTransport) GetSubscribers ¶ added in v0.3.0
func (x *DeliverBatchRequestTransport) GetSubscribers() *Subscribers
func (*DeliverBatchRequestTransport) GetTopic ¶ added in v0.3.0
func (x *DeliverBatchRequestTransport) GetTopic() string
func (*DeliverBatchRequestTransport) ProtoMessage ¶ added in v0.3.0
func (*DeliverBatchRequestTransport) ProtoMessage()
func (*DeliverBatchRequestTransport) ProtoReflect ¶ added in v0.3.0
func (x *DeliverBatchRequestTransport) ProtoReflect() protoreflect.Message
func (*DeliverBatchRequestTransport) Reset ¶ added in v0.3.0
func (x *DeliverBatchRequestTransport) Reset()
func (*DeliverBatchRequestTransport) String ¶ added in v0.3.0
func (x *DeliverBatchRequestTransport) String() string
type DeliveryStatus ¶ added in v0.3.0
type DeliveryStatus int32
Delivery status as seen by the delivery actor
const ( // Message was put in the queue of the subscriber DeliveryStatus_Delivered DeliveryStatus = 0 // Message did not reach subscriber, because it was dead DeliveryStatus_SubscriberNoLongerReachable DeliveryStatus = 1 // Delivery timed out DeliveryStatus_Timeout DeliveryStatus = 2 // Some other problem happened DeliveryStatus_OtherError DeliveryStatus = 127 )
func (DeliveryStatus) Descriptor ¶ added in v0.3.0
func (DeliveryStatus) Descriptor() protoreflect.EnumDescriptor
func (DeliveryStatus) Enum ¶ added in v0.3.0
func (x DeliveryStatus) Enum() *DeliveryStatus
func (DeliveryStatus) EnumDescriptor
deprecated
added in
v0.3.0
func (DeliveryStatus) EnumDescriptor() ([]byte, []int)
Deprecated: Use DeliveryStatus.Descriptor instead.
func (DeliveryStatus) Number ¶ added in v0.3.0
func (x DeliveryStatus) Number() protoreflect.EnumNumber
func (DeliveryStatus) String ¶ added in v0.3.0
func (x DeliveryStatus) String() string
func (DeliveryStatus) Type ¶ added in v0.3.0
func (DeliveryStatus) Type() protoreflect.EnumType
type EmptyKeyValueStore ¶ added in v0.3.0
type EmptyKeyValueStore[T any] struct{}
EmptyKeyValueStore is a key value store that does nothing.
func (*EmptyKeyValueStore[T]) Clear ¶ added in v0.3.0
func (e *EmptyKeyValueStore[T]) Clear(_ context.Context, _ string) error
type GetGossipStateRequest ¶
type GetGossipStateRequest struct {
Key string
}
Used to query the GossipActor about a given key status
func NewGetGossipStateRequest ¶
func NewGetGossipStateRequest(key string) GetGossipStateRequest
Create a new GetGossipStateRequest value and return it back
type GetGossipStateResponse ¶
type GetGossipStateResponse struct {
State map[string]*GossipKeyValue
}
Used by the GossipActor to send back the status value of a given key
func NewGetGossipStateResponse ¶
func NewGetGossipStateResponse(state map[string]*GossipKeyValue) GetGossipStateResponse
type Gossip ¶
type Gossip interface { GossipStateStorer GossipConsensusChecker GossipCore }
The Gossip interface must be implemented by any value that pretends to participate with-in the Gossip protocol
type GossipActor ¶
type GossipActor struct {
// contains filtered or unexported fields
}
Actor used to send gossip messages around
func NewGossipActor ¶
func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers func() set.Set[string], fanOut int, maxSend int, system *actor.ActorSystem) *GossipActor
Creates a new GossipActor and returns a pointer to its location in the heap
func (*GossipActor) ReceiveState ¶
func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)
type GossipConsensusChecker ¶
type GossipConsensusChecker interface { AddConsensusCheck(id string, check *ConsensusCheck) RemoveConsensusCheck(id string) }
This interface must be implemented by any value that wants to add or remove consensus checkers
type GossipCore ¶
type GossipCore interface { UpdateClusterTopology(topology *ClusterTopology) ReceiveState(remoteState *GossipState) []*GossipUpdate SendState(sendStateToMember LocalStateSender) GetMemberStateDelta(targetMemberID string) *MemberStateDelta }
This interface must be implemented by any value that wants to react to cluster topology events
type GossipDeltaValue ¶
type GossipDeltaValue struct { Entries []*GossipDeltaValue_GossipDeltaEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"` // contains filtered or unexported fields }
represents a value that can be sent in form of a delta change instead of a full value replace
func (*GossipDeltaValue) Descriptor
deprecated
func (*GossipDeltaValue) Descriptor() ([]byte, []int)
Deprecated: Use GossipDeltaValue.ProtoReflect.Descriptor instead.
func (*GossipDeltaValue) GetEntries ¶
func (x *GossipDeltaValue) GetEntries() []*GossipDeltaValue_GossipDeltaEntry
func (*GossipDeltaValue) ProtoMessage ¶
func (*GossipDeltaValue) ProtoMessage()
func (*GossipDeltaValue) ProtoReflect ¶
func (x *GossipDeltaValue) ProtoReflect() protoreflect.Message
func (*GossipDeltaValue) Reset ¶
func (x *GossipDeltaValue) Reset()
func (*GossipDeltaValue) String ¶
func (x *GossipDeltaValue) String() string
type GossipDeltaValue_GossipDeltaEntry ¶
type GossipDeltaValue_GossipDeltaEntry struct { SequenceNumber int64 `protobuf:"varint,1,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // contains filtered or unexported fields }
these are the entries of a delta value this can be seen as an array with data, where each element in the array is tagged with a sequence number
func (*GossipDeltaValue_GossipDeltaEntry) Descriptor
deprecated
func (*GossipDeltaValue_GossipDeltaEntry) Descriptor() ([]byte, []int)
Deprecated: Use GossipDeltaValue_GossipDeltaEntry.ProtoReflect.Descriptor instead.
func (*GossipDeltaValue_GossipDeltaEntry) GetData ¶
func (x *GossipDeltaValue_GossipDeltaEntry) GetData() []byte
func (*GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber ¶
func (x *GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber() int64
func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage ¶
func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage()
func (*GossipDeltaValue_GossipDeltaEntry) ProtoReflect ¶
func (x *GossipDeltaValue_GossipDeltaEntry) ProtoReflect() protoreflect.Message
func (*GossipDeltaValue_GossipDeltaEntry) Reset ¶
func (x *GossipDeltaValue_GossipDeltaEntry) Reset()
func (*GossipDeltaValue_GossipDeltaEntry) String ¶
func (x *GossipDeltaValue_GossipDeltaEntry) String() string
type GossipKeyValue ¶
type GossipKeyValue struct { SequenceNumber int64 `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` //version is local to the owner member Value *anypb.Any `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` //value is any format LocalTimestampUnixMilliseconds int64 `` /* 156-byte string literal not displayed */ // contains filtered or unexported fields }
a known key might be heartbeat. if we locally tag each entry with a local timestamp this means that we can measure if we have not received a new heartbeat from one member in some time even if we don't know the exact time the heartbeat was issued, due to clock differences. we still know when _we_ as in this node, got this data. and we can measure time from then til now.
if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead
func (*GossipKeyValue) Descriptor
deprecated
func (*GossipKeyValue) Descriptor() ([]byte, []int)
Deprecated: Use GossipKeyValue.ProtoReflect.Descriptor instead.
func (*GossipKeyValue) GetLocalTimestampUnixMilliseconds ¶
func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64
func (*GossipKeyValue) GetSequenceNumber ¶
func (x *GossipKeyValue) GetSequenceNumber() int64
func (*GossipKeyValue) GetValue ¶
func (x *GossipKeyValue) GetValue() *anypb.Any
func (*GossipKeyValue) ProtoMessage ¶
func (*GossipKeyValue) ProtoMessage()
func (*GossipKeyValue) ProtoReflect ¶
func (x *GossipKeyValue) ProtoReflect() protoreflect.Message
func (*GossipKeyValue) Reset ¶
func (x *GossipKeyValue) Reset()
func (*GossipKeyValue) String ¶
func (x *GossipKeyValue) String() string
type GossipKeyValues ¶
type GossipKeyValues = map[string]*GossipKeyValue
type GossipMemberState ¶
type GossipMemberState = GossipState_GossipMemberState
convenience type alias
type GossipMemberStates ¶
type GossipMemberStates = map[string]*GossipMemberState
type GossipRequest ¶
type GossipRequest struct { MemberId string `protobuf:"bytes,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"` State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` // contains filtered or unexported fields }
func (*GossipRequest) Descriptor
deprecated
func (*GossipRequest) Descriptor() ([]byte, []int)
Deprecated: Use GossipRequest.ProtoReflect.Descriptor instead.
func (*GossipRequest) GetMemberId ¶
func (x *GossipRequest) GetMemberId() string
func (*GossipRequest) GetState ¶
func (x *GossipRequest) GetState() *GossipState
func (*GossipRequest) ProtoMessage ¶
func (*GossipRequest) ProtoMessage()
func (*GossipRequest) ProtoReflect ¶
func (x *GossipRequest) ProtoReflect() protoreflect.Message
func (*GossipRequest) Reset ¶
func (x *GossipRequest) Reset()
func (*GossipRequest) String ¶
func (x *GossipRequest) String() string
type GossipResponse ¶
type GossipResponse struct { State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` // contains filtered or unexported fields }
Ack a gossip request
func (*GossipResponse) Descriptor
deprecated
func (*GossipResponse) Descriptor() ([]byte, []int)
Deprecated: Use GossipResponse.ProtoReflect.Descriptor instead.
func (*GossipResponse) GetState ¶
func (x *GossipResponse) GetState() *GossipState
func (*GossipResponse) ProtoMessage ¶
func (*GossipResponse) ProtoMessage()
func (*GossipResponse) ProtoReflect ¶
func (x *GossipResponse) ProtoReflect() protoreflect.Message
func (*GossipResponse) Reset ¶
func (x *GossipResponse) Reset()
func (*GossipResponse) String ¶
func (x *GossipResponse) String() string
type GossipState ¶
type GossipState struct { Members map[string]*GossipState_GossipMemberState `` /* 155-byte string literal not displayed */ // contains filtered or unexported fields }
two GossipState objects can be merged key + member_id gets it's own entry, if collision, highest version is selected
func (*GossipState) Descriptor
deprecated
func (*GossipState) Descriptor() ([]byte, []int)
Deprecated: Use GossipState.ProtoReflect.Descriptor instead.
func (*GossipState) GetMembers ¶
func (x *GossipState) GetMembers() map[string]*GossipState_GossipMemberState
func (*GossipState) ProtoMessage ¶
func (*GossipState) ProtoMessage()
func (*GossipState) ProtoReflect ¶
func (x *GossipState) ProtoReflect() protoreflect.Message
func (*GossipState) Reset ¶
func (x *GossipState) Reset()
func (*GossipState) String ¶
func (x *GossipState) String() string
type GossipStateStorer ¶
type GossipStateStorer interface { GetState(key string) map[string]*GossipKeyValue SetState(key string, value proto.Message) }
This interface must be implemented by any value that. wants to be used as a gossip state storage
type GossipState_GossipMemberState ¶
type GossipState_GossipMemberState struct { Values map[string]*GossipKeyValue `` /* 153-byte string literal not displayed */ // contains filtered or unexported fields }
func (*GossipState_GossipMemberState) Descriptor
deprecated
func (*GossipState_GossipMemberState) Descriptor() ([]byte, []int)
Deprecated: Use GossipState_GossipMemberState.ProtoReflect.Descriptor instead.
func (*GossipState_GossipMemberState) GetValues ¶
func (x *GossipState_GossipMemberState) GetValues() map[string]*GossipKeyValue
func (*GossipState_GossipMemberState) ProtoMessage ¶
func (*GossipState_GossipMemberState) ProtoMessage()
func (*GossipState_GossipMemberState) ProtoReflect ¶
func (x *GossipState_GossipMemberState) ProtoReflect() protoreflect.Message
func (*GossipState_GossipMemberState) Reset ¶
func (x *GossipState_GossipMemberState) Reset()
func (*GossipState_GossipMemberState) String ¶
func (x *GossipState_GossipMemberState) String() string
type GossipUpdate ¶
GossipUpdate Used to update gossip data when a ClusterTopology event occurs
type GossipUpdater ¶
type GossipUpdater func(*GossipState, map[string]empty)
type Gossiper ¶
type Gossiper struct { // The Gossiper Actor Name, defaults to "gossip" GossipActorName string // contains filtered or unexported fields }
The Gossiper data structure manages Gossip
func (*Gossiper) GetState ¶
func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error)
func (*Gossiper) RegisterConsensusCheck ¶
func (g *Gossiper) RegisterConsensusCheck(key string, getValue func(*anypb.Any) interface{}) ConsensusHandler
RegisterConsensusCheck Builds a consensus handler and a consensus checker, send the checker to the Gossip actor and returns the handler back to the caller
func (*Gossiper) SetStateRequest ¶
SetStateRequest Sends a Request (that blocks) to update member state
func (*Gossiper) StartGossiping ¶
type GrainCallConfig ¶
type GrainCallConfig struct { RetryCount int Timeout time.Duration RetryAction func(n int) int Context actor.SenderContext }
func DefaultGrainCallConfig ¶
func DefaultGrainCallConfig(cluster *Cluster) *GrainCallConfig
func NewGrainCallOptions ¶
func NewGrainCallOptions(cluster *Cluster) *GrainCallConfig
type GrainCallOption ¶
type GrainCallOption func(config *GrainCallConfig)
func WithContext ¶
func WithContext(ctx actor.SenderContext) GrainCallOption
func WithRetryAction ¶
func WithRetryAction(act func(i int) int) GrainCallOption
func WithRetryCount ¶ added in v0.3.0
func WithRetryCount(count int) GrainCallOption
func WithTimeout ¶
func WithTimeout(timeout time.Duration) GrainCallOption
type GrainContext ¶
func NewGrainContext ¶
func NewGrainContext(context actor.Context, identity *ClusterIdentity, cluster *Cluster) GrainContext
type GrainErrorResponse ¶
type GrainErrorResponse struct { Err string `protobuf:"bytes,1,opt,name=err,proto3" json:"err,omitempty"` // contains filtered or unexported fields }
func (*GrainErrorResponse) Descriptor
deprecated
func (*GrainErrorResponse) Descriptor() ([]byte, []int)
Deprecated: Use GrainErrorResponse.ProtoReflect.Descriptor instead.
func (*GrainErrorResponse) GetErr ¶
func (x *GrainErrorResponse) GetErr() string
func (*GrainErrorResponse) ProtoMessage ¶
func (*GrainErrorResponse) ProtoMessage()
func (*GrainErrorResponse) ProtoReflect ¶
func (x *GrainErrorResponse) ProtoReflect() protoreflect.Message
func (*GrainErrorResponse) Reset ¶
func (x *GrainErrorResponse) Reset()
func (*GrainErrorResponse) String ¶
func (x *GrainErrorResponse) String() string
type GrainRequest ¶
type GrainRequest struct { MethodIndex int32 `protobuf:"varint,1,opt,name=method_index,json=methodIndex,proto3" json:"method_index,omitempty"` MessageData []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` MessageTypeName string `protobuf:"bytes,3,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"` // contains filtered or unexported fields }
func (*GrainRequest) Descriptor
deprecated
func (*GrainRequest) Descriptor() ([]byte, []int)
Deprecated: Use GrainRequest.ProtoReflect.Descriptor instead.
func (*GrainRequest) GetMessageData ¶
func (x *GrainRequest) GetMessageData() []byte
func (*GrainRequest) GetMessageTypeName ¶
func (x *GrainRequest) GetMessageTypeName() string
func (*GrainRequest) GetMethodIndex ¶
func (x *GrainRequest) GetMethodIndex() int32
func (*GrainRequest) ProtoMessage ¶
func (*GrainRequest) ProtoMessage()
func (*GrainRequest) ProtoReflect ¶
func (x *GrainRequest) ProtoReflect() protoreflect.Message
func (*GrainRequest) Reset ¶
func (x *GrainRequest) Reset()
func (*GrainRequest) String ¶
func (x *GrainRequest) String() string
type GrainResponse ¶
type GrainResponse struct { MessageData []byte `protobuf:"bytes,1,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` MessageTypeName string `protobuf:"bytes,2,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"` // contains filtered or unexported fields }
func (*GrainResponse) Descriptor
deprecated
func (*GrainResponse) Descriptor() ([]byte, []int)
Deprecated: Use GrainResponse.ProtoReflect.Descriptor instead.
func (*GrainResponse) GetMessageData ¶
func (x *GrainResponse) GetMessageData() []byte
func (*GrainResponse) GetMessageTypeName ¶
func (x *GrainResponse) GetMessageTypeName() string
func (*GrainResponse) ProtoMessage ¶
func (*GrainResponse) ProtoMessage()
func (*GrainResponse) ProtoReflect ¶
func (x *GrainResponse) ProtoReflect() protoreflect.Message
func (*GrainResponse) Reset ¶
func (x *GrainResponse) Reset()
func (*GrainResponse) String ¶
func (x *GrainResponse) String() string
type IdentityHandover ¶
type IdentityHandover struct { Actors []*Activation `protobuf:"bytes,1,rep,name=actors,proto3" json:"actors,omitempty"` ChunkId int32 `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` Final bool `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"` TopologyHash uint64 `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Skipped int32 `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"` // Total number of activations skipped Sent int32 `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"` // Total number of activations sent // contains filtered or unexported fields }
func (*IdentityHandover) Descriptor
deprecated
func (*IdentityHandover) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandover.ProtoReflect.Descriptor instead.
func (*IdentityHandover) GetActors ¶
func (x *IdentityHandover) GetActors() []*Activation
func (*IdentityHandover) GetChunkId ¶
func (x *IdentityHandover) GetChunkId() int32
func (*IdentityHandover) GetFinal ¶
func (x *IdentityHandover) GetFinal() bool
func (*IdentityHandover) GetSent ¶
func (x *IdentityHandover) GetSent() int32
func (*IdentityHandover) GetSkipped ¶
func (x *IdentityHandover) GetSkipped() int32
func (*IdentityHandover) GetTopologyHash ¶
func (x *IdentityHandover) GetTopologyHash() uint64
func (*IdentityHandover) ProtoMessage ¶
func (*IdentityHandover) ProtoMessage()
func (*IdentityHandover) ProtoReflect ¶
func (x *IdentityHandover) ProtoReflect() protoreflect.Message
func (*IdentityHandover) Reset ¶
func (x *IdentityHandover) Reset()
func (*IdentityHandover) String ¶
func (x *IdentityHandover) String() string
type IdentityHandoverAck ¶
type IdentityHandoverAck struct { ChunkId int32 `protobuf:"varint,1,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` TopologyHash uint64 `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` ProcessingState IdentityHandoverAck_State `` /* 146-byte string literal not displayed */ // contains filtered or unexported fields }
func (*IdentityHandoverAck) Descriptor
deprecated
func (*IdentityHandoverAck) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverAck.ProtoReflect.Descriptor instead.
func (*IdentityHandoverAck) GetChunkId ¶
func (x *IdentityHandoverAck) GetChunkId() int32
func (*IdentityHandoverAck) GetProcessingState ¶
func (x *IdentityHandoverAck) GetProcessingState() IdentityHandoverAck_State
func (*IdentityHandoverAck) GetTopologyHash ¶
func (x *IdentityHandoverAck) GetTopologyHash() uint64
func (*IdentityHandoverAck) ProtoMessage ¶
func (*IdentityHandoverAck) ProtoMessage()
func (*IdentityHandoverAck) ProtoReflect ¶
func (x *IdentityHandoverAck) ProtoReflect() protoreflect.Message
func (*IdentityHandoverAck) Reset ¶
func (x *IdentityHandoverAck) Reset()
func (*IdentityHandoverAck) String ¶
func (x *IdentityHandoverAck) String() string
type IdentityHandoverAck_State ¶
type IdentityHandoverAck_State int32
const ( IdentityHandoverAck_processed IdentityHandoverAck_State = 0 IdentityHandoverAck_incorrect_topology IdentityHandoverAck_State = 1 )
func (IdentityHandoverAck_State) Descriptor ¶
func (IdentityHandoverAck_State) Descriptor() protoreflect.EnumDescriptor
func (IdentityHandoverAck_State) Enum ¶
func (x IdentityHandoverAck_State) Enum() *IdentityHandoverAck_State
func (IdentityHandoverAck_State) EnumDescriptor
deprecated
func (IdentityHandoverAck_State) EnumDescriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverAck_State.Descriptor instead.
func (IdentityHandoverAck_State) Number ¶
func (x IdentityHandoverAck_State) Number() protoreflect.EnumNumber
func (IdentityHandoverAck_State) String ¶
func (x IdentityHandoverAck_State) String() string
func (IdentityHandoverAck_State) Type ¶
func (IdentityHandoverAck_State) Type() protoreflect.EnumType
type IdentityHandoverRequest ¶
type IdentityHandoverRequest struct { CurrentTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,1,opt,name=current_topology,json=currentTopology,proto3" json:"current_topology,omitempty"` Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` // If the requester passes a delta topology, only return activations which would not be assigned to the member // in the previous topology. DeltaTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,3,opt,name=delta_topology,json=deltaTopology,proto3" json:"delta_topology,omitempty"` // contains filtered or unexported fields }
request response call from Identity actor sent to each member asking what activations they hold that belong to the requester
func (*IdentityHandoverRequest) Descriptor
deprecated
func (*IdentityHandoverRequest) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverRequest.ProtoReflect.Descriptor instead.
func (*IdentityHandoverRequest) GetAddress ¶
func (x *IdentityHandoverRequest) GetAddress() string
func (*IdentityHandoverRequest) GetCurrentTopology ¶
func (x *IdentityHandoverRequest) GetCurrentTopology() *IdentityHandoverRequest_Topology
func (*IdentityHandoverRequest) GetDeltaTopology ¶
func (x *IdentityHandoverRequest) GetDeltaTopology() *IdentityHandoverRequest_Topology
func (*IdentityHandoverRequest) ProtoMessage ¶
func (*IdentityHandoverRequest) ProtoMessage()
func (*IdentityHandoverRequest) ProtoReflect ¶
func (x *IdentityHandoverRequest) ProtoReflect() protoreflect.Message
func (*IdentityHandoverRequest) Reset ¶
func (x *IdentityHandoverRequest) Reset()
func (*IdentityHandoverRequest) String ¶
func (x *IdentityHandoverRequest) String() string
type IdentityHandoverRequest_Topology ¶
type IdentityHandoverRequest_Topology struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Members []*Member `protobuf:"bytes,3,rep,name=members,proto3" json:"members,omitempty"` // contains filtered or unexported fields }
func (*IdentityHandoverRequest_Topology) Descriptor
deprecated
func (*IdentityHandoverRequest_Topology) Descriptor() ([]byte, []int)
Deprecated: Use IdentityHandoverRequest_Topology.ProtoReflect.Descriptor instead.
func (*IdentityHandoverRequest_Topology) GetMembers ¶
func (x *IdentityHandoverRequest_Topology) GetMembers() []*Member
func (*IdentityHandoverRequest_Topology) GetTopologyHash ¶
func (x *IdentityHandoverRequest_Topology) GetTopologyHash() uint64
func (*IdentityHandoverRequest_Topology) ProtoMessage ¶
func (*IdentityHandoverRequest_Topology) ProtoMessage()
func (*IdentityHandoverRequest_Topology) ProtoReflect ¶
func (x *IdentityHandoverRequest_Topology) ProtoReflect() protoreflect.Message
func (*IdentityHandoverRequest_Topology) Reset ¶
func (x *IdentityHandoverRequest_Topology) Reset()
func (*IdentityHandoverRequest_Topology) String ¶
func (x *IdentityHandoverRequest_Topology) String() string
type IdentityLookup ¶
type IdentityLookup interface { Get(clusterIdentity *ClusterIdentity) *actor.PID RemovePid(clusterIdentity *ClusterIdentity, pid *actor.PID) Setup(cluster *Cluster, kinds []string, isClient bool) Shutdown() }
IdentityLookup contains
type IdentityStorageLookup ¶
type IdentityStorageLookup struct { Storage StorageLookup // contains filtered or unexported fields }
IdentityStorageLookup contains
func (*IdentityStorageLookup) Get ¶
func (id *IdentityStorageLookup) Get(clusterIdentity *ClusterIdentity) *actor.PID
Get returns a PID for a given ClusterIdentity
func (*IdentityStorageLookup) RemoveMember ¶
func (i *IdentityStorageLookup) RemoveMember(memberID string)
RemoveMember from identity storage
type IdentityStorageWorker ¶
type IdentityStorageWorker struct {
// contains filtered or unexported fields
}
func (*IdentityStorageWorker) Receive ¶
func (ids *IdentityStorageWorker) Receive(c actor.Context)
Receive func
type Informer ¶
type Informer struct {
// contains filtered or unexported fields
}
The Informer data structure implements the Gossip interface
func (*Informer) AddConsensusCheck ¶
func (inf *Informer) AddConsensusCheck(id string, check *ConsensusCheck)
adds a new consensus checker to this informer
func (*Informer) CheckConsensus ¶
check consensus for the given keys
func (*Informer) GetMemberStateDelta ¶
func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelta
func (*Informer) GetState ¶
func (inf *Informer) GetState(key string) map[string]*GossipKeyValue
retrieves this informer current state for the given key returns map containing each known member id and their value
func (*Informer) ReceiveState ¶
func (inf *Informer) ReceiveState(remoteState *GossipState) []*GossipUpdate
receives a remote informer state
func (*Informer) RemoveConsensusCheck ¶
removes a consensus checker from this informer
func (*Informer) SendState ¶
func (inf *Informer) SendState(sendStateToMember LocalStateSender)
sends this informer local state to remote informers chosen randomly from the slice of other members known by this informer until gossipFanOut number of sent has been reached
func (*Informer) UpdateClusterTopology ¶
func (inf *Informer) UpdateClusterTopology(topology *ClusterTopology)
called when there is a cluster topology update
type Initialize ¶ added in v0.3.0
type Initialize struct { IdleTimeout *durationpb.Duration `protobuf:"bytes,1,opt,name=idleTimeout,proto3" json:"idleTimeout,omitempty"` // contains filtered or unexported fields }
First request to initialize the actor.
func (*Initialize) Descriptor
deprecated
added in
v0.3.0
func (*Initialize) Descriptor() ([]byte, []int)
Deprecated: Use Initialize.ProtoReflect.Descriptor instead.
func (*Initialize) GetIdleTimeout ¶ added in v0.3.0
func (x *Initialize) GetIdleTimeout() *durationpb.Duration
func (*Initialize) ProtoMessage ¶ added in v0.3.0
func (*Initialize) ProtoMessage()
func (*Initialize) ProtoReflect ¶ added in v0.3.0
func (x *Initialize) ProtoReflect() protoreflect.Message
func (*Initialize) Reset ¶ added in v0.3.0
func (x *Initialize) Reset()
func (*Initialize) String ¶ added in v0.3.0
func (x *Initialize) String() string
type InvalidOperationException ¶ added in v0.3.0
type InvalidOperationException struct {
Topic string
}
func (*InvalidOperationException) Error ¶ added in v0.3.0
func (i *InvalidOperationException) Error() string
func (*InvalidOperationException) Is ¶ added in v0.3.0
func (i *InvalidOperationException) Is(err error) bool
type KeyValueStore ¶ added in v0.3.0
type KeyValueStore[T any] interface { // Set the value for the given key. Set(ctx context.Context, key string, value T) error // Get the value for the given key.. Get(ctx context.Context, key string) (T, error) // Clear the value for the given key. Clear(ctx context.Context, key string) error }
KeyValueStore is a distributed key value store
type Kind ¶
type Kind struct { Kind string Props *actor.Props StrategyBuilder func(*Cluster) MemberStrategy }
Kind represents the kinds of actors a cluster can manage
func (*Kind) Build ¶
func (k *Kind) Build(cluster *Cluster) *ActivatedKind
func (*Kind) WithMemberStrategy ¶
func (k *Kind) WithMemberStrategy(strategyBuilder func(*Cluster) MemberStrategy)
type LocalStateSender ¶
type LocalStateSender func(memberStateDelta *MemberStateDelta, member *Member)
customary type that defines a states sender callback.
type Member ¶
type Member struct { Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` Port int32 `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"` Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` Kinds []string `protobuf:"bytes,4,rep,name=kinds,proto3" json:"kinds,omitempty"` // contains filtered or unexported fields }
func (*Member) Descriptor
deprecated
func (*Member) ProtoMessage ¶
func (*Member) ProtoMessage()
func (*Member) ProtoReflect ¶
func (x *Member) ProtoReflect() protoreflect.Message
type MemberAvailableEvent ¶
type MemberAvailableEvent struct {
MemberMeta
}
func (*MemberAvailableEvent) MemberStatusEvent ¶
func (*MemberAvailableEvent) MemberStatusEvent()
type MemberHeartbeat ¶
type MemberHeartbeat struct { ActorStatistics *ActorStatistics `protobuf:"bytes,1,opt,name=actor_statistics,json=actorStatistics,proto3" json:"actor_statistics,omitempty"` // contains filtered or unexported fields }
func (*MemberHeartbeat) Descriptor
deprecated
func (*MemberHeartbeat) Descriptor() ([]byte, []int)
Deprecated: Use MemberHeartbeat.ProtoReflect.Descriptor instead.
func (*MemberHeartbeat) GetActorStatistics ¶
func (x *MemberHeartbeat) GetActorStatistics() *ActorStatistics
func (*MemberHeartbeat) ProtoMessage ¶
func (*MemberHeartbeat) ProtoMessage()
func (*MemberHeartbeat) ProtoReflect ¶
func (x *MemberHeartbeat) ProtoReflect() protoreflect.Message
func (*MemberHeartbeat) Reset ¶
func (x *MemberHeartbeat) Reset()
func (*MemberHeartbeat) String ¶
func (x *MemberHeartbeat) String() string
type MemberJoinedEvent ¶
type MemberJoinedEvent struct {
MemberMeta
}
func (*MemberJoinedEvent) MemberStatusEvent ¶
func (*MemberJoinedEvent) MemberStatusEvent()
type MemberLeftEvent ¶
type MemberLeftEvent struct {
MemberMeta
}
func (*MemberLeftEvent) MemberStatusEvent ¶
func (*MemberLeftEvent) MemberStatusEvent()
type MemberList ¶
type MemberList struct {
// contains filtered or unexported fields
}
MemberList is responsible to keep track of the current cluster topology it does so by listening to changes from the ClusterProvider. the default ClusterProvider is consul.ConsulProvider which uses the Consul HTTP API to scan for changes
func NewMemberList ¶
func NewMemberList(cluster *Cluster) *MemberList
func (*MemberList) BroadcastEvent ¶
func (ml *MemberList) BroadcastEvent(message interface{}, includeSelf bool)
func (*MemberList) ContainsMemberID ¶
func (ml *MemberList) ContainsMemberID(memberID string) bool
func (*MemberList) GetActivatorMember ¶
func (ml *MemberList) GetActivatorMember(kind string, requestSourceAddress string) string
func (*MemberList) InitializeTopologyConsensus ¶
func (ml *MemberList) InitializeTopologyConsensus()
func (*MemberList) Length ¶
func (ml *MemberList) Length() int
func (*MemberList) Members ¶
func (ml *MemberList) Members() *MemberSet
func (*MemberList) TerminateMember ¶
func (ml *MemberList) TerminateMember(m *Member)
func (*MemberList) TopologyConsensus ¶
func (ml *MemberList) TopologyConsensus(ctx context.Context) (uint64, bool)
func (*MemberList) UpdateClusterTopology ¶
func (ml *MemberList) UpdateClusterTopology(members Members)
type MemberMeta ¶
func (*MemberMeta) GetKinds ¶
func (e *MemberMeta) GetKinds() []string
func (*MemberMeta) Name ¶
func (e *MemberMeta) Name() string
type MemberRejoinedEvent ¶
type MemberRejoinedEvent struct {
MemberMeta
}
func (*MemberRejoinedEvent) MemberStatusEvent ¶
func (*MemberRejoinedEvent) MemberStatusEvent()
type MemberSet ¶
type MemberSet struct {
// contains filtered or unexported fields
}
func NewMemberSet ¶
func (*MemberSet) ContainsID ¶
func (*MemberSet) GetMemberById ¶
func (*MemberSet) TopologyHash ¶
type MemberStateDelta ¶
type MemberStateDelta struct { TargetMemberID string HasState bool State *GossipState CommitOffsets func() }
type MemberStatus ¶
func (*MemberStatus) Address ¶
func (m *MemberStatus) Address() string
type MemberStatusEvent ¶
type MemberStatusEvent interface { MemberStatusEvent() GetKinds() []string }
type MemberStrategy ¶
type MemberUnavailableEvent ¶
type MemberUnavailableEvent struct {
}func (*MemberUnavailableEvent) MemberStatusEvent ¶
func (*MemberUnavailableEvent) MemberStatusEvent()
type NotifyAboutFailingSubscribersRequest ¶ added in v0.3.0
type NotifyAboutFailingSubscribersRequest struct { InvalidDeliveries []*SubscriberDeliveryReport `protobuf:"bytes,1,rep,name=invalid_deliveries,json=invalidDeliveries,proto3" json:"invalid_deliveries,omitempty"` // contains filtered or unexported fields }
Message sent from delivery actor to topic to notify of subscribers that fail to process the messages
func (*NotifyAboutFailingSubscribersRequest) Descriptor
deprecated
added in
v0.3.0
func (*NotifyAboutFailingSubscribersRequest) Descriptor() ([]byte, []int)
Deprecated: Use NotifyAboutFailingSubscribersRequest.ProtoReflect.Descriptor instead.
func (*NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries ¶ added in v0.3.0
func (x *NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries() []*SubscriberDeliveryReport
func (*NotifyAboutFailingSubscribersRequest) ProtoMessage ¶ added in v0.3.0
func (*NotifyAboutFailingSubscribersRequest) ProtoMessage()
func (*NotifyAboutFailingSubscribersRequest) ProtoReflect ¶ added in v0.3.0
func (x *NotifyAboutFailingSubscribersRequest) ProtoReflect() protoreflect.Message
func (*NotifyAboutFailingSubscribersRequest) Reset ¶ added in v0.3.0
func (x *NotifyAboutFailingSubscribersRequest) Reset()
func (*NotifyAboutFailingSubscribersRequest) String ¶ added in v0.3.0
func (x *NotifyAboutFailingSubscribersRequest) String() string
type NotifyAboutFailingSubscribersResponse ¶ added in v0.3.0
type NotifyAboutFailingSubscribersResponse struct {
// contains filtered or unexported fields
}
Ack to the delivery actor after notification of subscribers that fail to process the messages
func (*NotifyAboutFailingSubscribersResponse) Descriptor
deprecated
added in
v0.3.0
func (*NotifyAboutFailingSubscribersResponse) Descriptor() ([]byte, []int)
Deprecated: Use NotifyAboutFailingSubscribersResponse.ProtoReflect.Descriptor instead.
func (*NotifyAboutFailingSubscribersResponse) ProtoMessage ¶ added in v0.3.0
func (*NotifyAboutFailingSubscribersResponse) ProtoMessage()
func (*NotifyAboutFailingSubscribersResponse) ProtoReflect ¶ added in v0.3.0
func (x *NotifyAboutFailingSubscribersResponse) ProtoReflect() protoreflect.Message
func (*NotifyAboutFailingSubscribersResponse) Reset ¶ added in v0.3.0
func (x *NotifyAboutFailingSubscribersResponse) Reset()
func (*NotifyAboutFailingSubscribersResponse) String ¶ added in v0.3.0
func (x *NotifyAboutFailingSubscribersResponse) String() string
type PackedActivations ¶
type PackedActivations struct { Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` Actors []*PackedActivations_Kind `protobuf:"bytes,2,rep,name=actors,proto3" json:"actors,omitempty"` // contains filtered or unexported fields }
func (*PackedActivations) Descriptor
deprecated
func (*PackedActivations) Descriptor() ([]byte, []int)
Deprecated: Use PackedActivations.ProtoReflect.Descriptor instead.
func (*PackedActivations) GetActors ¶
func (x *PackedActivations) GetActors() []*PackedActivations_Kind
func (*PackedActivations) GetAddress ¶
func (x *PackedActivations) GetAddress() string
func (*PackedActivations) ProtoMessage ¶
func (*PackedActivations) ProtoMessage()
func (*PackedActivations) ProtoReflect ¶
func (x *PackedActivations) ProtoReflect() protoreflect.Message
func (*PackedActivations) Reset ¶
func (x *PackedActivations) Reset()
func (*PackedActivations) String ¶
func (x *PackedActivations) String() string
type PackedActivations_Activation ¶
type PackedActivations_Activation struct { Identity string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"` ActivationId string `protobuf:"bytes,2,opt,name=activation_id,json=activationId,proto3" json:"activation_id,omitempty"` // contains filtered or unexported fields }
func (*PackedActivations_Activation) Descriptor
deprecated
func (*PackedActivations_Activation) Descriptor() ([]byte, []int)
Deprecated: Use PackedActivations_Activation.ProtoReflect.Descriptor instead.
func (*PackedActivations_Activation) GetActivationId ¶
func (x *PackedActivations_Activation) GetActivationId() string
func (*PackedActivations_Activation) GetIdentity ¶
func (x *PackedActivations_Activation) GetIdentity() string
func (*PackedActivations_Activation) ProtoMessage ¶
func (*PackedActivations_Activation) ProtoMessage()
func (*PackedActivations_Activation) ProtoReflect ¶
func (x *PackedActivations_Activation) ProtoReflect() protoreflect.Message
func (*PackedActivations_Activation) Reset ¶
func (x *PackedActivations_Activation) Reset()
func (*PackedActivations_Activation) String ¶
func (x *PackedActivations_Activation) String() string
type PackedActivations_Kind ¶
type PackedActivations_Kind struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Activations []*PackedActivations_Activation `protobuf:"bytes,2,rep,name=activations,proto3" json:"activations,omitempty"` // contains filtered or unexported fields }
func (*PackedActivations_Kind) Descriptor
deprecated
func (*PackedActivations_Kind) Descriptor() ([]byte, []int)
Deprecated: Use PackedActivations_Kind.ProtoReflect.Descriptor instead.
func (*PackedActivations_Kind) GetActivations ¶
func (x *PackedActivations_Kind) GetActivations() []*PackedActivations_Activation
func (*PackedActivations_Kind) GetName ¶
func (x *PackedActivations_Kind) GetName() string
func (*PackedActivations_Kind) ProtoMessage ¶
func (*PackedActivations_Kind) ProtoMessage()
func (*PackedActivations_Kind) ProtoReflect ¶
func (x *PackedActivations_Kind) ProtoReflect() protoreflect.Message
func (*PackedActivations_Kind) Reset ¶
func (x *PackedActivations_Kind) Reset()
func (*PackedActivations_Kind) String ¶
func (x *PackedActivations_Kind) String() string
type PidCacheValue ¶
type PidCacheValue struct {
// contains filtered or unexported fields
}
func NewPidCache ¶
func NewPidCache() *PidCacheValue
func (*PidCacheValue) Remove ¶
func (c *PidCacheValue) Remove(identity string, kind string)
func (*PidCacheValue) RemoveByMember ¶
func (c *PidCacheValue) RemoveByMember(member *Member)
func (*PidCacheValue) RemoveByValue ¶
func (c *PidCacheValue) RemoveByValue(identity string, kind string, pid *actor.PID)
type ProduceProcessInfo ¶ added in v0.3.0
type ProduceProcessInfo struct { Finished chan struct{} Err error // contains filtered or unexported fields }
ProduceProcessInfo is the context for a Produce call
func (*ProduceProcessInfo) IsCancelled ¶ added in v0.3.0
func (p *ProduceProcessInfo) IsCancelled() bool
IsCancelled returns true if the context has been cancelled
func (*ProduceProcessInfo) IsFinished ¶ added in v0.3.0
func (p *ProduceProcessInfo) IsFinished() bool
IsFinished returns true if the context has been finished
type ProducerQueueFullException ¶ added in v0.3.0
type ProducerQueueFullException struct {
// contains filtered or unexported fields
}
func (*ProducerQueueFullException) Error ¶ added in v0.3.0
func (p *ProducerQueueFullException) Error() string
func (*ProducerQueueFullException) Is ¶ added in v0.3.0
func (p *ProducerQueueFullException) Is(target error) bool
type ProxyActivationRequest ¶
type ProxyActivationRequest struct { ClusterIdentity *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"` ReplacedActivation *actor.PID `protobuf:"bytes,2,opt,name=replaced_activation,json=replacedActivation,proto3" json:"replaced_activation,omitempty"` // contains filtered or unexported fields }
func (*ProxyActivationRequest) Descriptor
deprecated
func (*ProxyActivationRequest) Descriptor() ([]byte, []int)
Deprecated: Use ProxyActivationRequest.ProtoReflect.Descriptor instead.
func (*ProxyActivationRequest) GetClusterIdentity ¶
func (x *ProxyActivationRequest) GetClusterIdentity() *ClusterIdentity
func (*ProxyActivationRequest) GetReplacedActivation ¶
func (x *ProxyActivationRequest) GetReplacedActivation() *actor.PID
func (*ProxyActivationRequest) ProtoMessage ¶
func (*ProxyActivationRequest) ProtoMessage()
func (*ProxyActivationRequest) ProtoReflect ¶
func (x *ProxyActivationRequest) ProtoReflect() protoreflect.Message
func (*ProxyActivationRequest) Reset ¶
func (x *ProxyActivationRequest) Reset()
func (*ProxyActivationRequest) String ¶
func (x *ProxyActivationRequest) String() string
type PubSub ¶ added in v0.3.0
type PubSub struct {
// contains filtered or unexported fields
}
func GetPubSub ¶ added in v0.3.0
func GetPubSub(system *actor.ActorSystem) *PubSub
GetPubSub returns the PubSub extension from the actor system
func (*PubSub) ExtensionID ¶ added in v0.3.0
func (p *PubSub) ExtensionID() extensions.ExtensionID
type PubSubAutoRespondBatch ¶ added in v0.3.0
type PubSubAutoRespondBatch struct {
Envelopes []interface{}
}
func (*PubSubAutoRespondBatch) GetAutoResponse ¶ added in v0.3.0
func (b *PubSubAutoRespondBatch) GetAutoResponse(_ actor.Context) interface{}
GetAutoResponse returns a PublishResponse.
func (*PubSubAutoRespondBatch) GetMessages ¶ added in v0.3.0
func (b *PubSubAutoRespondBatch) GetMessages() []interface{}
GetMessages returns the message.
func (*PubSubAutoRespondBatch) Serialize ¶ added in v0.3.0
func (b *PubSubAutoRespondBatch) Serialize() remote.RootSerialized
Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.
type PubSubAutoRespondBatchTransport ¶ added in v0.3.0
type PubSubAutoRespondBatchTransport struct { TypeNames []string `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"` Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"` // contains filtered or unexported fields }
Message posted to subscriber's mailbox, that is then unrolled to single messages, and has ability to auto respond See also PubSubAutoRespondBatch
func (*PubSubAutoRespondBatchTransport) Descriptor
deprecated
added in
v0.3.0
func (*PubSubAutoRespondBatchTransport) Descriptor() ([]byte, []int)
Deprecated: Use PubSubAutoRespondBatchTransport.ProtoReflect.Descriptor instead.
func (*PubSubAutoRespondBatchTransport) Deserialize ¶ added in v0.3.0
func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable
Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.
func (*PubSubAutoRespondBatchTransport) GetEnvelopes ¶ added in v0.3.0
func (x *PubSubAutoRespondBatchTransport) GetEnvelopes() []*PubSubEnvelope
func (*PubSubAutoRespondBatchTransport) GetTypeNames ¶ added in v0.3.0
func (x *PubSubAutoRespondBatchTransport) GetTypeNames() []string
func (*PubSubAutoRespondBatchTransport) ProtoMessage ¶ added in v0.3.0
func (*PubSubAutoRespondBatchTransport) ProtoMessage()
func (*PubSubAutoRespondBatchTransport) ProtoReflect ¶ added in v0.3.0
func (x *PubSubAutoRespondBatchTransport) ProtoReflect() protoreflect.Message
func (*PubSubAutoRespondBatchTransport) Reset ¶ added in v0.3.0
func (x *PubSubAutoRespondBatchTransport) Reset()
func (*PubSubAutoRespondBatchTransport) String ¶ added in v0.3.0
func (x *PubSubAutoRespondBatchTransport) String() string
type PubSubBatch ¶ added in v0.3.0
type PubSubBatch struct {
Envelopes []interface{}
}
func (*PubSubBatch) Serialize ¶ added in v0.3.0
func (b *PubSubBatch) Serialize() remote.RootSerialized
Serialize converts a PubSubBatch to a PubSubBatchTransport.
type PubSubBatchTransport ¶ added in v0.3.0
type PubSubBatchTransport struct { TypeNames []string `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"` Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"` // contains filtered or unexported fields }
Message sent from publisher to topic actor See also PubSubBatch
func (*PubSubBatchTransport) Descriptor
deprecated
added in
v0.3.0
func (*PubSubBatchTransport) Descriptor() ([]byte, []int)
Deprecated: Use PubSubBatchTransport.ProtoReflect.Descriptor instead.
func (*PubSubBatchTransport) Deserialize ¶ added in v0.3.0
func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable
Deserialize converts a PubSubBatchTransport to a PubSubBatch.
func (*PubSubBatchTransport) GetEnvelopes ¶ added in v0.3.0
func (x *PubSubBatchTransport) GetEnvelopes() []*PubSubEnvelope
func (*PubSubBatchTransport) GetTypeNames ¶ added in v0.3.0
func (x *PubSubBatchTransport) GetTypeNames() []string
func (*PubSubBatchTransport) ProtoMessage ¶ added in v0.3.0
func (*PubSubBatchTransport) ProtoMessage()
func (*PubSubBatchTransport) ProtoReflect ¶ added in v0.3.0
func (x *PubSubBatchTransport) ProtoReflect() protoreflect.Message
func (*PubSubBatchTransport) Reset ¶ added in v0.3.0
func (x *PubSubBatchTransport) Reset()
func (*PubSubBatchTransport) String ¶ added in v0.3.0
func (x *PubSubBatchTransport) String() string
type PubSubConfig ¶ added in v0.3.0
type PubSubConfig struct { // SubscriberTimeout is a timeout used when delivering a message batch to a subscriber. Default is 5s. // // This value gets rounded to seconds for optimization of cancellation token creation. Note that internally, // cluster request is used to deliver messages to ClusterIdentity subscribers. SubscriberTimeout time.Duration }
type PubSubEnvelope ¶ added in v0.3.0
type PubSubEnvelope struct { TypeId int32 `protobuf:"varint,1,opt,name=type_id,json=typeId,proto3" json:"type_id,omitempty"` MessageData []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` SerializerId int32 `protobuf:"varint,3,opt,name=serializer_id,json=serializerId,proto3" json:"serializer_id,omitempty"` // contains filtered or unexported fields }
Contains message byte representation and type reference
func (*PubSubEnvelope) Descriptor
deprecated
added in
v0.3.0
func (*PubSubEnvelope) Descriptor() ([]byte, []int)
Deprecated: Use PubSubEnvelope.ProtoReflect.Descriptor instead.
func (*PubSubEnvelope) GetMessageData ¶ added in v0.3.0
func (x *PubSubEnvelope) GetMessageData() []byte
func (*PubSubEnvelope) GetSerializerId ¶ added in v0.3.0
func (x *PubSubEnvelope) GetSerializerId() int32
func (*PubSubEnvelope) GetTypeId ¶ added in v0.3.0
func (x *PubSubEnvelope) GetTypeId() int32
func (*PubSubEnvelope) ProtoMessage ¶ added in v0.3.0
func (*PubSubEnvelope) ProtoMessage()
func (*PubSubEnvelope) ProtoReflect ¶ added in v0.3.0
func (x *PubSubEnvelope) ProtoReflect() protoreflect.Message
func (*PubSubEnvelope) Reset ¶ added in v0.3.0
func (x *PubSubEnvelope) Reset()
func (*PubSubEnvelope) String ¶ added in v0.3.0
func (x *PubSubEnvelope) String() string
type PubSubMemberDeliveryActor ¶ added in v0.3.0
type PubSubMemberDeliveryActor struct {
// contains filtered or unexported fields
}
func NewPubSubMemberDeliveryActor ¶ added in v0.3.0
func NewPubSubMemberDeliveryActor(subscriberTimeout time.Duration, logger *slog.Logger) *PubSubMemberDeliveryActor
func (*PubSubMemberDeliveryActor) DeliverBatch ¶ added in v0.3.0
func (p *PubSubMemberDeliveryActor) DeliverBatch(c actor.Context, batch *PubSubAutoRespondBatch, s *SubscriberIdentity) *actor.Future
DeliverBatch delivers PubSubAutoRespondBatch to SubscriberIdentity.
func (*PubSubMemberDeliveryActor) DeliverToClusterIdentity ¶ added in v0.3.0
func (p *PubSubMemberDeliveryActor) DeliverToClusterIdentity(c actor.Context, batch *PubSubAutoRespondBatch, ci *ClusterIdentity) *actor.Future
DeliverToClusterIdentity delivers PubSubAutoRespondBatch to ClusterIdentity.
func (*PubSubMemberDeliveryActor) DeliverToPid ¶ added in v0.3.0
func (p *PubSubMemberDeliveryActor) DeliverToPid(c actor.Context, batch *PubSubAutoRespondBatch, pid *actor.PID) *actor.Future
DeliverToPid delivers PubSubAutoRespondBatch to PID.
func (*PubSubMemberDeliveryActor) Receive ¶ added in v0.3.0
func (p *PubSubMemberDeliveryActor) Receive(c actor.Context)
type PublishResponse ¶ added in v0.3.0
type PublishResponse struct { // Status of the whole published batch or single message Status PublishStatus `protobuf:"varint,1,opt,name=status,proto3,enum=cluster.PublishStatus" json:"status,omitempty"` // contains filtered or unexported fields }
Publish ack/nack response
func (*PublishResponse) Descriptor
deprecated
added in
v0.3.0
func (*PublishResponse) Descriptor() ([]byte, []int)
Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead.
func (*PublishResponse) GetStatus ¶ added in v0.3.0
func (x *PublishResponse) GetStatus() PublishStatus
func (*PublishResponse) ProtoMessage ¶ added in v0.3.0
func (*PublishResponse) ProtoMessage()
func (*PublishResponse) ProtoReflect ¶ added in v0.3.0
func (x *PublishResponse) ProtoReflect() protoreflect.Message
func (*PublishResponse) Reset ¶ added in v0.3.0
func (x *PublishResponse) Reset()
func (*PublishResponse) String ¶ added in v0.3.0
func (x *PublishResponse) String() string
type PublishStatus ¶ added in v0.3.0
type PublishStatus int32
Status of the whole published batch or single message
const ( // Batch or message was successfully published according to the delivery guarantees PublishStatus_Ok PublishStatus = 0 // Topic failed to forward the message PublishStatus_Failed PublishStatus = 1 )
func (PublishStatus) Descriptor ¶ added in v0.3.0
func (PublishStatus) Descriptor() protoreflect.EnumDescriptor
func (PublishStatus) Enum ¶ added in v0.3.0
func (x PublishStatus) Enum() *PublishStatus
func (PublishStatus) EnumDescriptor
deprecated
added in
v0.3.0
func (PublishStatus) EnumDescriptor() ([]byte, []int)
Deprecated: Use PublishStatus.Descriptor instead.
func (PublishStatus) Number ¶ added in v0.3.0
func (x PublishStatus) Number() protoreflect.EnumNumber
func (PublishStatus) String ¶ added in v0.3.0
func (x PublishStatus) String() string
func (PublishStatus) Type ¶ added in v0.3.0
func (PublishStatus) Type() protoreflect.EnumType
type Publisher ¶ added in v0.3.0
type Publisher interface { // Initialize the internal mechanisms of this publisher. Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) // PublishBatch publishes a batch of messages to the topic. PublishBatch(ctx context.Context, topic string, batch *PubSubBatch, opts ...GrainCallOption) (*PublishResponse, error) // Publish publishes a single message to the topic. Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) Logger() *slog.Logger }
func NewPublisher ¶ added in v0.3.0
type PublisherConfig ¶ added in v0.3.0
type PublishingErrorDecision ¶ added in v0.3.0
func NewPublishingErrorDecision ¶ added in v0.3.0
func NewPublishingErrorDecision(delay time.Duration) *PublishingErrorDecision
NewPublishingErrorDecision creates a new PublishingErrorDecision
func RetryBatchAfter ¶ added in v0.3.0
func RetryBatchAfter(delay time.Duration) *PublishingErrorDecision
RetryBatchAfter returns a new PublishingErrorDecision with the Delay set to the given duration
type PublishingErrorHandler ¶ added in v0.3.0
type PublishingErrorHandler func(retries int, e error, batch *PubSubBatch) *PublishingErrorDecision
PublishingErrorHandler decides what to do with a publishing error in BatchingProducer
type ReadyForRebalance ¶
type ReadyForRebalance struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*ReadyForRebalance) Descriptor
deprecated
func (*ReadyForRebalance) Descriptor() ([]byte, []int)
Deprecated: Use ReadyForRebalance.ProtoReflect.Descriptor instead.
func (*ReadyForRebalance) GetTopologyHash ¶
func (x *ReadyForRebalance) GetTopologyHash() uint64
func (*ReadyForRebalance) ProtoMessage ¶
func (*ReadyForRebalance) ProtoMessage()
func (*ReadyForRebalance) ProtoReflect ¶
func (x *ReadyForRebalance) ProtoReflect() protoreflect.Message
func (*ReadyForRebalance) Reset ¶
func (x *ReadyForRebalance) Reset()
func (*ReadyForRebalance) String ¶
func (x *ReadyForRebalance) String() string
type RebalanceCompleted ¶
type RebalanceCompleted struct { TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` // contains filtered or unexported fields }
func (*RebalanceCompleted) Descriptor
deprecated
func (*RebalanceCompleted) Descriptor() ([]byte, []int)
Deprecated: Use RebalanceCompleted.ProtoReflect.Descriptor instead.
func (*RebalanceCompleted) GetTopologyHash ¶
func (x *RebalanceCompleted) GetTopologyHash() uint64
func (*RebalanceCompleted) ProtoMessage ¶
func (*RebalanceCompleted) ProtoMessage()
func (*RebalanceCompleted) ProtoReflect ¶
func (x *RebalanceCompleted) ProtoReflect() protoreflect.Message
func (*RebalanceCompleted) Reset ¶
func (x *RebalanceCompleted) Reset()
func (*RebalanceCompleted) String ¶
func (x *RebalanceCompleted) String() string
type RemoteIdentityHandover ¶
type RemoteIdentityHandover struct { Actors *PackedActivations `protobuf:"bytes,1,opt,name=actors,proto3" json:"actors,omitempty"` ChunkId int32 `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"` Final bool `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"` TopologyHash uint64 `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"` Skipped int32 `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"` Sent int32 `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"` // contains filtered or unexported fields }
func (*RemoteIdentityHandover) Descriptor
deprecated
func (*RemoteIdentityHandover) Descriptor() ([]byte, []int)
Deprecated: Use RemoteIdentityHandover.ProtoReflect.Descriptor instead.
func (*RemoteIdentityHandover) GetActors ¶
func (x *RemoteIdentityHandover) GetActors() *PackedActivations
func (*RemoteIdentityHandover) GetChunkId ¶
func (x *RemoteIdentityHandover) GetChunkId() int32
func (*RemoteIdentityHandover) GetFinal ¶
func (x *RemoteIdentityHandover) GetFinal() bool
func (*RemoteIdentityHandover) GetSent ¶
func (x *RemoteIdentityHandover) GetSent() int32
func (*RemoteIdentityHandover) GetSkipped ¶
func (x *RemoteIdentityHandover) GetSkipped() int32
func (*RemoteIdentityHandover) GetTopologyHash ¶
func (x *RemoteIdentityHandover) GetTopologyHash() uint64
func (*RemoteIdentityHandover) ProtoMessage ¶
func (*RemoteIdentityHandover) ProtoMessage()
func (*RemoteIdentityHandover) ProtoReflect ¶
func (x *RemoteIdentityHandover) ProtoReflect() protoreflect.Message
func (*RemoteIdentityHandover) Reset ¶
func (x *RemoteIdentityHandover) Reset()
func (*RemoteIdentityHandover) String ¶
func (x *RemoteIdentityHandover) String() string
type RemoveConsensusCheck ¶
type RemoveConsensusCheck struct {
ID string
}
Mimic .NET ReenterAfterCancellation on GossipActor
func NewRemoveConsensusCheck ¶
func NewRemoveConsensusCheck(id string) RemoveConsensusCheck
type Rendezvous ¶
type Rendezvous struct {
// contains filtered or unexported fields
}
func NewRendezvous ¶
func NewRendezvous() *Rendezvous
func (*Rendezvous) GetByClusterIdentity ¶
func (r *Rendezvous) GetByClusterIdentity(ci *ClusterIdentity) string
func (*Rendezvous) GetByIdentity ¶
func (r *Rendezvous) GetByIdentity(identity string) string
func (*Rendezvous) UpdateMembers ¶
func (r *Rendezvous) UpdateMembers(members Members)
type SendGossipStateRequest ¶
type SendGossipStateRequest struct{}
type SendGossipStateResponse ¶
type SendGossipStateResponse struct{}
type SetGossipStateKey ¶
Used to setup Gossip Status Keys in the GossipActor
func NewGossipStateKey ¶
func NewGossipStateKey(key string, value proto.Message) SetGossipStateKey
Create a new SetGossipStateKey value with the given data and return it back
type SetGossipStateResponse ¶
type SetGossipStateResponse struct{}
Used by the GossipActor to respond SetGossipStatus requests
type SimpleRoundRobin ¶
type SimpleRoundRobin struct {
// contains filtered or unexported fields
}
func NewSimpleRoundRobin ¶
func NewSimpleRoundRobin(memberStrategy MemberStrategy) *SimpleRoundRobin
func (*SimpleRoundRobin) GetByRoundRobin ¶
func (r *SimpleRoundRobin) GetByRoundRobin() string
type SpawnLock ¶
type SpawnLock struct { LockID string ClusterIdentity *ClusterIdentity }
SpawnLock contains
type StorageLookup ¶
type StorageLookup interface { TryGetExistingActivation(clusterIdentity *ClusterIdentity) *StoredActivation TryAcquireLock(clusterIdentity *ClusterIdentity) *SpawnLock WaitForActivation(clusterIdentity *ClusterIdentity) *StoredActivation RemoveLock(spawnLock SpawnLock) StoreActivation(memberID string, spawnLock *SpawnLock, pid *actor.PID) RemoveActivation(pid *SpawnLock) RemoveMemberId(memberID string) }
StorageLookup contains
type StoredActivation ¶
StoredActivation contains
type SubscribeRequest ¶ added in v0.3.0
type SubscribeRequest struct { Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` // contains filtered or unexported fields }
Sent to topic actor to add a subscriber
func (*SubscribeRequest) Descriptor
deprecated
added in
v0.3.0
func (*SubscribeRequest) Descriptor() ([]byte, []int)
Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead.
func (*SubscribeRequest) GetSubscriber ¶ added in v0.3.0
func (x *SubscribeRequest) GetSubscriber() *SubscriberIdentity
func (*SubscribeRequest) ProtoMessage ¶ added in v0.3.0
func (*SubscribeRequest) ProtoMessage()
func (*SubscribeRequest) ProtoReflect ¶ added in v0.3.0
func (x *SubscribeRequest) ProtoReflect() protoreflect.Message
func (*SubscribeRequest) Reset ¶ added in v0.3.0
func (x *SubscribeRequest) Reset()
func (*SubscribeRequest) String ¶ added in v0.3.0
func (x *SubscribeRequest) String() string
type SubscribeResponse ¶ added in v0.3.0
type SubscribeResponse struct {
// contains filtered or unexported fields
}
Subscribe acknowledgement
func (*SubscribeResponse) Descriptor
deprecated
added in
v0.3.0
func (*SubscribeResponse) Descriptor() ([]byte, []int)
Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead.
func (*SubscribeResponse) ProtoMessage ¶ added in v0.3.0
func (*SubscribeResponse) ProtoMessage()
func (*SubscribeResponse) ProtoReflect ¶ added in v0.3.0
func (x *SubscribeResponse) ProtoReflect() protoreflect.Message
func (*SubscribeResponse) Reset ¶ added in v0.3.0
func (x *SubscribeResponse) Reset()
func (*SubscribeResponse) String ¶ added in v0.3.0
func (x *SubscribeResponse) String() string
type SubscriberDeliveryReport ¶ added in v0.3.0
type SubscriberDeliveryReport struct { Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` Status DeliveryStatus `protobuf:"varint,2,opt,name=status,proto3,enum=cluster.DeliveryStatus" json:"status,omitempty"` // contains filtered or unexported fields }
Contains information about a failed delivery
func (*SubscriberDeliveryReport) Descriptor
deprecated
added in
v0.3.0
func (*SubscriberDeliveryReport) Descriptor() ([]byte, []int)
Deprecated: Use SubscriberDeliveryReport.ProtoReflect.Descriptor instead.
func (*SubscriberDeliveryReport) GetStatus ¶ added in v0.3.0
func (x *SubscriberDeliveryReport) GetStatus() DeliveryStatus
func (*SubscriberDeliveryReport) GetSubscriber ¶ added in v0.3.0
func (x *SubscriberDeliveryReport) GetSubscriber() *SubscriberIdentity
func (*SubscriberDeliveryReport) ProtoMessage ¶ added in v0.3.0
func (*SubscriberDeliveryReport) ProtoMessage()
func (*SubscriberDeliveryReport) ProtoReflect ¶ added in v0.3.0
func (x *SubscriberDeliveryReport) ProtoReflect() protoreflect.Message
func (*SubscriberDeliveryReport) Reset ¶ added in v0.3.0
func (x *SubscriberDeliveryReport) Reset()
func (*SubscriberDeliveryReport) String ¶ added in v0.3.0
func (x *SubscriberDeliveryReport) String() string
type SubscriberIdentity ¶ added in v0.3.0
type SubscriberIdentity struct { // Types that are assignable to Identity: // *SubscriberIdentity_Pid // *SubscriberIdentity_ClusterIdentity Identity isSubscriberIdentity_Identity `protobuf_oneof:"Identity"` // contains filtered or unexported fields }
Identifies a subscriber by either ClusterIdentity or PID
func (*SubscriberIdentity) Descriptor
deprecated
added in
v0.3.0
func (*SubscriberIdentity) Descriptor() ([]byte, []int)
Deprecated: Use SubscriberIdentity.ProtoReflect.Descriptor instead.
func (*SubscriberIdentity) GetClusterIdentity ¶ added in v0.3.0
func (x *SubscriberIdentity) GetClusterIdentity() *ClusterIdentity
func (*SubscriberIdentity) GetIdentity ¶ added in v0.3.0
func (m *SubscriberIdentity) GetIdentity() isSubscriberIdentity_Identity
func (*SubscriberIdentity) GetPid ¶ added in v0.3.0
func (x *SubscriberIdentity) GetPid() *actor.PID
func (*SubscriberIdentity) ProtoMessage ¶ added in v0.3.0
func (*SubscriberIdentity) ProtoMessage()
func (*SubscriberIdentity) ProtoReflect ¶ added in v0.3.0
func (x *SubscriberIdentity) ProtoReflect() protoreflect.Message
func (*SubscriberIdentity) Reset ¶ added in v0.3.0
func (x *SubscriberIdentity) Reset()
func (*SubscriberIdentity) String ¶ added in v0.3.0
func (x *SubscriberIdentity) String() string
type SubscriberIdentity_ClusterIdentity ¶ added in v0.3.0
type SubscriberIdentity_ClusterIdentity struct {
ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3,oneof"`
}
type SubscriberIdentity_Pid ¶ added in v0.3.0
type Subscribers ¶ added in v0.3.0
type Subscribers struct { Subscribers []*SubscriberIdentity `protobuf:"bytes,1,rep,name=subscribers,proto3" json:"subscribers,omitempty"` // contains filtered or unexported fields }
A list of subscribers
func (*Subscribers) Descriptor
deprecated
added in
v0.3.0
func (*Subscribers) Descriptor() ([]byte, []int)
Deprecated: Use Subscribers.ProtoReflect.Descriptor instead.
func (*Subscribers) GetSubscribers ¶ added in v0.3.0
func (x *Subscribers) GetSubscribers() []*SubscriberIdentity
func (*Subscribers) ProtoMessage ¶ added in v0.3.0
func (*Subscribers) ProtoMessage()
func (*Subscribers) ProtoReflect ¶ added in v0.3.0
func (x *Subscribers) ProtoReflect() protoreflect.Message
func (*Subscribers) Reset ¶ added in v0.3.0
func (x *Subscribers) Reset()
func (*Subscribers) String ¶ added in v0.3.0
func (x *Subscribers) String() string
type TestMessage ¶ added in v0.3.0
type TestMessage struct { Number int32 `protobuf:"varint,1,opt,name=number,proto3" json:"number,omitempty"` // contains filtered or unexported fields }
func (*TestMessage) Descriptor
deprecated
added in
v0.3.0
func (*TestMessage) Descriptor() ([]byte, []int)
Deprecated: Use TestMessage.ProtoReflect.Descriptor instead.
func (*TestMessage) GetNumber ¶ added in v0.3.0
func (x *TestMessage) GetNumber() int32
func (*TestMessage) ProtoMessage ¶ added in v0.3.0
func (*TestMessage) ProtoMessage()
func (*TestMessage) ProtoReflect ¶ added in v0.3.0
func (x *TestMessage) ProtoReflect() protoreflect.Message
func (*TestMessage) Reset ¶ added in v0.3.0
func (x *TestMessage) Reset()
func (*TestMessage) String ¶ added in v0.3.0
func (x *TestMessage) String() string
type TopicActor ¶ added in v0.3.0
type TopicActor struct {
// contains filtered or unexported fields
}
func NewTopicActor ¶ added in v0.3.0
func NewTopicActor(store KeyValueStore[*Subscribers], logger *slog.Logger) *TopicActor
func (*TopicActor) Receive ¶ added in v0.3.0
func (t *TopicActor) Receive(c actor.Context)
type UnsubscribeRequest ¶ added in v0.3.0
type UnsubscribeRequest struct { Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` // contains filtered or unexported fields }
Sent to topic actor to remove a subscriber
func (*UnsubscribeRequest) Descriptor
deprecated
added in
v0.3.0
func (*UnsubscribeRequest) Descriptor() ([]byte, []int)
Deprecated: Use UnsubscribeRequest.ProtoReflect.Descriptor instead.
func (*UnsubscribeRequest) GetSubscriber ¶ added in v0.3.0
func (x *UnsubscribeRequest) GetSubscriber() *SubscriberIdentity
func (*UnsubscribeRequest) ProtoMessage ¶ added in v0.3.0
func (*UnsubscribeRequest) ProtoMessage()
func (*UnsubscribeRequest) ProtoReflect ¶ added in v0.3.0
func (x *UnsubscribeRequest) ProtoReflect() protoreflect.Message
func (*UnsubscribeRequest) Reset ¶ added in v0.3.0
func (x *UnsubscribeRequest) Reset()
func (*UnsubscribeRequest) String ¶ added in v0.3.0
func (x *UnsubscribeRequest) String() string
type UnsubscribeResponse ¶ added in v0.3.0
type UnsubscribeResponse struct {
// contains filtered or unexported fields
}
Unsubscribe acknowledgement
func (*UnsubscribeResponse) Descriptor
deprecated
added in
v0.3.0
func (*UnsubscribeResponse) Descriptor() ([]byte, []int)
Deprecated: Use UnsubscribeResponse.ProtoReflect.Descriptor instead.
func (*UnsubscribeResponse) ProtoMessage ¶ added in v0.3.0
func (*UnsubscribeResponse) ProtoMessage()
func (*UnsubscribeResponse) ProtoReflect ¶ added in v0.3.0
func (x *UnsubscribeResponse) ProtoReflect() protoreflect.Message
func (*UnsubscribeResponse) Reset ¶ added in v0.3.0
func (x *UnsubscribeResponse) Reset()
func (*UnsubscribeResponse) String ¶ added in v0.3.0
func (x *UnsubscribeResponse) String() string
Source Files ¶
- cluster.go
- cluster.pb.go
- cluster_config_context.go
- cluster_identity.go
- cluster_provider.go
- config.go
- config_opts.go
- consensus.go
- consensus_check_builder.go
- consensus_checks.go
- context.go
- default_context.go
- gossip.go
- gossip.pb.go
- gossip_actor.go
- gossip_state_management.go
- gossiper.go
- grain.go
- grain.pb.go
- grain_context.go
- identity_lookup.go
- identity_storage_lookup.go
- identity_storage_worker.go
- informer.go
- key_value_store.go
- kind.go
- member.go
- member_list.go
- member_state_delta.go
- member_status.go
- member_status_events.go
- member_strategy.go
- members.go
- messages.go
- options.go
- pid_cache.go
- pubsub.go
- pubsub.pb.go
- pubsub_batch.go
- pubsub_delivery.go
- pubsub_extensions.go
- pubsub_producer.go
- pubsub_producer_opts.go
- pubsub_publisher.go
- pubsub_test.pb.go
- pubsub_topic.go
- rendezvous.go
- round_robin.go
- types.go