Documentation
¶
Index ¶
- Constants
- Variables
- func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
- func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error)
- func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error
- func Tell(ctx context.Context, to *PID, message proto.Message) error
- type Actor
- type ActorMetric
- func (x ActorMetric) ChidrenCount() uint64
- func (x ActorMetric) DeadlettersCount() uint64
- func (x ActorMetric) LatestProcessedDuration() time.Duration
- func (x ActorMetric) ProcessedCount() uint64
- func (x ActorMetric) RestartCount() uint64
- func (x ActorMetric) StashSize() uint64
- func (x ActorMetric) Uptime() int64
- type ActorRef
- type ActorSystem
- type Behavior
- type BoundedMailbox
- type ClusterConfig
- func (x *ClusterConfig) Discovery() discovery.Provider
- func (x *ClusterConfig) DiscoveryPort() int
- func (x *ClusterConfig) KVStoreSize() uint64
- func (x *ClusterConfig) Kinds() []Actor
- func (x *ClusterConfig) MinimumPeersQuorum() uint32
- func (x *ClusterConfig) PartitionCount() uint64
- func (x *ClusterConfig) PeersPort() int
- func (x *ClusterConfig) ReadQuorum() uint32
- func (x *ClusterConfig) ReplicaCount() uint32
- func (x *ClusterConfig) Validate() error
- func (x *ClusterConfig) WAL() string
- func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig
- func (x *ClusterConfig) WithDiscoveryPort(port int) *ClusterConfig
- func (x *ClusterConfig) WithKVStoreSize(size uint64) *ClusterConfig
- func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig
- func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig
- func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig
- func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig
- func (x *ClusterConfig) WithReadQuorum(count uint32) *ClusterConfig
- func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig
- func (x *ClusterConfig) WithWAL(dir string) *ClusterConfig
- func (x *ClusterConfig) WithWriteQuorum(count uint32) *ClusterConfig
- func (x *ClusterConfig) WriteQuorum() uint32
- type FuncOption
- type InternalError
- type Mailbox
- type Metric
- type Option
- func WithActorInitMaxRetries(value int) Option
- func WithActorInitTimeout(timeout time.Duration) Option
- func WithCluster(config *ClusterConfig) Option
- func WithCoordinatedShutdown(hooks ...ShutdownHook) Option
- func WithExpireActorAfter(duration time.Duration) Option
- func WithJanitorInterval(interval time.Duration) Option
- func WithLogger(logger log.Logger) Option
- func WithPartitionHasher(hasher hash.Hasher) Option
- func WithPassivationDisabled() Option
- func WithPeerStateLoopInterval(interval time.Duration) Option
- func WithRemote(config *remote.Config) Option
- func WithRemoting(host string, port int32) Option
- func WithShutdownTimeout(timeout time.Duration) Option
- func WithStash() Option
- func WithTLS(tlsInfo *TLSInfo) Option
- type OptionFunc
- type PID
- func (pid *PID) Actor() Actor
- func (pid *PID) ActorSystem() ActorSystem
- func (pid *PID) Address() *address.Address
- func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
- func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message, err error)
- func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error
- func (pid *PID) Child(name string) (*PID, error)
- func (pid *PID) Children() []*PID
- func (pid *PID) ChildrenCount() int
- func (pid *PID) Equals(to *PID) bool
- func (pid *PID) ID() string
- func (pid *PID) IsRunning() bool
- func (pid *PID) IsSuspended() bool
- func (pid *PID) LatestProcessedDuration() time.Duration
- func (pid *PID) Logger() log.Logger
- func (pid *PID) Metric(ctx context.Context) *ActorMetric
- func (pid *PID) Name() string
- func (pid *PID) Parent() *PID
- func (pid *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error
- func (pid *PID) ProcessedCount() int
- func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, ...) (response *anypb.Any, err error)
- func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, ...) (responses []*anypb.Any, err error)
- func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messages []proto.Message) error
- func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)
- func (pid *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) error
- func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
- func (pid *PID) RemoteStop(ctx context.Context, host string, port int, name string) error
- func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error
- func (pid *PID) Restart(ctx context.Context) error
- func (pid *PID) RestartCount() int
- func (pid *PID) SendAsync(ctx context.Context, actorName string, message proto.Message) error
- func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message, ...) (response proto.Message, err error)
- func (pid *PID) Shutdown(ctx context.Context) error
- func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
- func (pid *PID) StashSize() uint64
- func (pid *PID) Stop(ctx context.Context, cid *PID) error
- func (pid *PID) Tell(ctx context.Context, to *PID, message proto.Message) error
- func (pid *PID) UnWatch(cid *PID)
- func (pid *PID) Uptime() int64
- func (pid *PID) Watch(cid *PID)
- type PanicError
- type PostStopFunc
- type PreStartFunc
- type PriorityFunc
- type ReceiveContext
- func (rctx *ReceiveContext) ActorSystem() ActorSystem
- func (rctx *ReceiveContext) Ask(to *PID, message proto.Message, timeout time.Duration) (response proto.Message)
- func (rctx *ReceiveContext) BatchAsk(to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message)
- func (rctx *ReceiveContext) BatchTell(to *PID, messages ...proto.Message)
- func (rctx *ReceiveContext) Become(behavior Behavior)
- func (rctx *ReceiveContext) BecomeStacked(behavior Behavior)
- func (rctx *ReceiveContext) Child(name string) *PID
- func (rctx *ReceiveContext) Children() []*PID
- func (rctx *ReceiveContext) Context() context.Context
- func (rctx *ReceiveContext) Err(err error)
- func (rctx *ReceiveContext) Forward(to *PID)
- func (rctx *ReceiveContext) ForwardTo(actorName string)
- func (rctx *ReceiveContext) Logger() log.Logger
- func (rctx *ReceiveContext) Message() proto.Message
- func (rctx *ReceiveContext) PipeTo(to *PID, task future.Task)
- func (rctx *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any)
- func (rctx *ReceiveContext) RemoteBatchAsk(to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any)
- func (rctx *ReceiveContext) RemoteBatchTell(to *address.Address, messages []proto.Message)
- func (rctx *ReceiveContext) RemoteForward(to *address.Address)
- func (rctx *ReceiveContext) RemoteLookup(host string, port int, name string) (addr *goaktpb.Address)
- func (rctx *ReceiveContext) RemoteReSpawn(host string, port int, name string)
- func (rctx *ReceiveContext) RemoteSender() *address.Address
- func (rctx *ReceiveContext) RemoteTell(to *address.Address, message proto.Message)
- func (rctx *ReceiveContext) Response(resp proto.Message)
- func (rctx *ReceiveContext) Self() *PID
- func (rctx *ReceiveContext) SendAsync(actorName string, message proto.Message)
- func (rctx *ReceiveContext) SendSync(actorName string, message proto.Message, timeout time.Duration) (response proto.Message)
- func (rctx *ReceiveContext) Sender() *PID
- func (rctx *ReceiveContext) Shutdown()
- func (rctx *ReceiveContext) Spawn(name string, actor Actor, opts ...SpawnOption) *PID
- func (rctx *ReceiveContext) Stash()
- func (rctx *ReceiveContext) Stop(child *PID)
- func (rctx *ReceiveContext) Tell(to *PID, message proto.Message)
- func (rctx *ReceiveContext) UnBecome()
- func (rctx *ReceiveContext) UnBecomeStacked()
- func (rctx *ReceiveContext) UnWatch(cid *PID)
- func (rctx *ReceiveContext) Unhandled()
- func (rctx *ReceiveContext) Unstash()
- func (rctx *ReceiveContext) UnstashAll()
- func (rctx *ReceiveContext) Watch(cid *PID)
- type ReceiveFunc
- type Remoting
- func (r *Remoting) Close()
- func (r *Remoting) RemoteAsk(ctx context.Context, from, to *address.Address, message proto.Message, ...) (response *anypb.Any, err error)
- func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address, messages []proto.Message, ...) (responses []*anypb.Any, err error)
- func (r *Remoting) RemoteBatchTell(ctx context.Context, from, to *address.Address, messages []proto.Message) error
- func (r *Remoting) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *address.Address, err error)
- func (r *Remoting) RemoteReSpawn(ctx context.Context, host string, port int, name string) error
- func (r *Remoting) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
- func (r *Remoting) RemoteStop(ctx context.Context, host string, port int, name string) error
- func (r *Remoting) RemoteTell(ctx context.Context, from, to *address.Address, message proto.Message) error
- type RemotingOption
- type RestartDirective
- type ResumeDirective
- type RouterOption
- type RouterOptionFunc
- type RoutingStrategy
- type ShutdownHook
- type SpawnError
- type SpawnOption
- type StopDirective
- type SupervisorDirective
- type SupervisorStrategy
- type TLSInfo
- type UnboundedMailbox
- type UnboundedPriorityMailBox
Constants ¶
const ( // DefaultPassivationTimeout defines the default passivation timeout DefaultPassivationTimeout = 2 * time.Minute // DefaultInitMaxRetries defines the default value for retrying actor initialization DefaultInitMaxRetries = 5 // DefaultShutdownTimeout defines the default shutdown timeout DefaultShutdownTimeout = time.Minute // DefaultInitTimeout defines the default init timeout DefaultInitTimeout = time.Second // DefaultPeerStateLoopInterval defines the default peer state loop interval DefaultPeerStateLoopInterval = 10 * time.Second // DefaultAskTimeout defines the default ask timeout DefaultAskTimeout = 5 * time.Second // DefaultJanitorInterval defines the default GC interval // This helps cleanup dead actors from the given actor system DefaultJanitorInterval = 30 * time.Millisecond )
Variables ¶
var ( // ErrInvalidActorSystemName is returned when the actor system name is invalid ErrInvalidActorSystemName = errors.New("invalid ActorSystem name, must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')") // ErrDead means that the given actor is not alive ErrDead = errors.New("actor is not alive") // ErrUnhandled is used when an actor can handle a given message ErrUnhandled = errors.New("unhandled message") // ErrClusterDisabled is returned when cluster mode is not enabled while accessing cluster features ErrClusterDisabled = errors.New("cluster is not enabled") // ErrUndefinedActor is returned when an actor is defined ErrUndefinedActor = errors.New("actor is not defined") // ErrRequestTimeout is returned when sending an Ask message times out ErrRequestTimeout = errors.New("request timed out") // ErrRemotingDisabled is returned when remoting is not enabled ErrRemotingDisabled = errors.New("remoting is not enabled") // ErrAddressNotFound is returned when an actor address is not found ErrAddressNotFound = func(addr string) error { return connect.NewError(connect.CodeNotFound, fmt.Errorf("actor=%s not found", addr)) } // ErrRemoteSendFailure is returned when remote message fails ErrRemoteSendFailure = func(err error) error { return connect.NewError(connect.CodeInternal, err) } // ErrNameRequired is used when the actor system name is required ErrNameRequired = errors.New("actor system is required") // ErrInvalidInstance is returned when the creation of an actor instance fails ErrInvalidInstance = errors.New("failed to create instance. Reason: invalid instance") // ErrActorNotFound is returned when an actor is not found ErrActorNotFound = func(actorPath string) error { return fmt.Errorf("actor=%s not found", actorPath) } // ErrMethodCallNotAllowed is returned when rpc call is not allowed ErrMethodCallNotAllowed = errors.New("method call not allowed") // ErrInvalidRemoteMessage is returned when an invalid remote message is sent ErrInvalidRemoteMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) } // ErrStashBufferNotSet when stashing is not set while requesting for messages to be stashed ErrStashBufferNotSet = errors.New("actor is not setup with a stash buffer") // ErrInitFailure is returned when the initialization of an actor fails. ErrInitFailure = func(err error) error { return fmt.Errorf("failed to initialize: %w", err) } // ErrActorSystemNotStarted is returned when the actor is not started while accessing the features of the actor system ErrActorSystemNotStarted = errors.New("actor system has not started yet") // ErrLocalAddress is returned when a remote address is used instead of a local address ErrLocalAddress = errors.New("address is a local address") // ErrInstanceNotAnActor is returned when we failed to create the instance of an actor ErrInstanceNotAnActor = errors.New("failed to create instance. Reason: instance does not implement the Actor interface") // ErrTypeNotRegistered is returned when a given actor is not registered ErrTypeNotRegistered = errors.New("actor type is not registered") // ErrPeerNotFound is returned when locating a given peer ErrPeerNotFound = errors.New("peer is not found") // ErrUndefinedTask is returned when piping a long-running task result to an actor ErrUndefinedTask = errors.New("task is not defined") // ErrInvalidHost is returned when a request is sent to an invalid host ErrInvalidHost = errors.New("invalid host") // ErrFullMailbox is returned when the mailbox is full ErrFullMailbox = errors.New("mailbox is full") // ErrSchedulerNotStarted is returned when the scheduler has not started ErrSchedulerNotStarted = errors.New("scheduler has not started") // ErrInvalidMessage is returned when an invalid remote message is sent ErrInvalidMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) } // ErrInvalidTimeout is returned when a given timeout is negative or zero ErrInvalidTimeout = errors.New("invalid timeout") // ErrPriorityMessageRequired is returned when a non-priority message is used in a priority mailbox ErrPriorityMessageRequired = errors.New("priority message type is required") // ErrActorAlreadyExists is returned when trying to create the same actor more than once ErrActorAlreadyExists = func(actorName string) error { return fmt.Errorf("actor=(%s) already exists", actorName) } // ErrInvalidTLSConfiguration is returned whent the TLS configuration is not properly set ErrInvalidTLSConfiguration = errors.New("TLS configuration is invalid") )
var DefaultSupervisorStrategies = []*SupervisorStrategy{ NewSupervisorStrategy(PanicError{}, NewStopDirective()), NewSupervisorStrategy(&runtime.PanicNilError{}, NewStopDirective()), }
DefaultSupervisorStrategies defines the default supervisor strategies
Functions ¶
func Ask ¶
func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.
func BatchAsk ¶
func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error)
BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.
Types ¶
type Actor ¶
type Actor interface { // PreStart pre-starts the actor. This function can be used to set up some database connections // or some sort of initialization before the actor start processing messages // when the initialization failed the actor will not be started. // Use this function to set any fields that will be needed before the actor starts. // This hook helps set the default values needed by any fields of the actor. PreStart(ctx context.Context) error // Receive processes any message dropped into the actor mailbox. // The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously // by config the reply of the message. The latter approach is often used when an external service is communicating to the actor. // One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system. Receive(ctx *ReceiveContext) // PostStop is executed when the actor is shutting down. // The execution happens when every message that have not been processed yet will be processed before the actor shutdowns // This help free-up resources PostStop(ctx context.Context) error }
Actor represents the Actor interface This will be implemented by any user who wants to create an actor Any implementation must immutable which means all fields must be private(unexported). Only make use the PreStart hook to set the initial values.
type ActorMetric ¶ added in v2.11.1
type ActorMetric struct {
// contains filtered or unexported fields
}
ActorMetric defines actor specific metrics
func (ActorMetric) ChidrenCount ¶ added in v2.11.1
func (x ActorMetric) ChidrenCount() uint64
ChidrenCount returns the total number of child actor given a specific PID
func (ActorMetric) DeadlettersCount ¶ added in v2.11.1
func (x ActorMetric) DeadlettersCount() uint64
DeadlettersCount returns the total number of deadletters
func (ActorMetric) LatestProcessedDuration ¶ added in v2.11.1
func (x ActorMetric) LatestProcessedDuration() time.Duration
LatestProcessedDuration returns the duration of the latest message processed duration
func (ActorMetric) ProcessedCount ¶ added in v2.11.1
func (x ActorMetric) ProcessedCount() uint64
ProcessedCount returns the total number of messages processed at a given time
func (ActorMetric) RestartCount ¶ added in v2.11.1
func (x ActorMetric) RestartCount() uint64
RestartCount returns the total number of re-starts by the given PID
func (ActorMetric) StashSize ¶ added in v2.11.1
func (x ActorMetric) StashSize() uint64
StashSize returns the stash size at a given time
func (ActorMetric) Uptime ¶ added in v2.11.1
func (x ActorMetric) Uptime() int64
Uptime returns the number of seconds since the actor/system started
type ActorRef ¶ added in v2.11.1
type ActorRef struct {
// contains filtered or unexported fields
}
ActorRef defines the information about a given actor. The following information is captured by the ActorRef:
Name: The given actor name which is unique both locally and in a cluster environment. Actor's names only word characters that is:[a-zA-Z0-9] plus non-leading '-' or '_'.
Kind: The actor kind returns the reflected type of the underlying Actor backing the given actor reference.
Address: The actor address. One can use the address with Remoting to interact with the actor by sending messages.
type ActorSystem ¶
type ActorSystem interface { // Metric returns the actor system metric. // The metric does not include any cluster data Metric(ctx context.Context) *Metric // Name returns the actor system name Name() string // Actors returns the list of Actors that are alive on a given running node. // This does not account for the total number of actors in the cluster Actors() []*PID // ActorRefs retrieves a list of active actors, including both local actors // and, when cluster mode is enabled, actors across the cluster. Use this // method cautiously, as the scanning process may impact system performance. // If the cluster request fails, only locally active actors will be returned. // The timeout parameter defines the maximum duration for cluster-based requests // before they are terminated. ActorRefs(ctx context.Context, timeout time.Duration) []ActorRef // Start initializes the actor system. // To guarantee a clean shutdown during unexpected system terminations, // developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop. Start(ctx context.Context) error // Stop stops the actor system and does not terminate the program. // One needs to explicitly call os.Exit to terminate the program. Stop(ctx context.Context) error // Spawn creates an actor in the system and starts it Spawn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error) // SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks // in the given optional options SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) // SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks // in the given optional options SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) // SpawnRouter creates a new router. One can additionally set the router options. // A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel. // A single actor will only process one message at a time. SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error) // Kill stops a given actor in the system Kill(ctx context.Context, name string) error // ReSpawn recreates a given actor in the system // During restart all messages that are in the mailbox and not yet processed will be ignored. // Only the direct alive children of the given actor will be shudown and respawned with their initial state. // Bear in mind that restarting an actor will reinitialize the actor to initial state. // In case any of the direct child restart fails the given actor will not be started at all. ReSpawn(ctx context.Context, name string) (*PID, error) // NumActors returns the total number of active actors on a given running node. // This does not account for the total number of actors in the cluster NumActors() uint64 // LocalActor returns the reference of a local actor. // A local actor is an actor that reside on the same node where the given actor system has started LocalActor(actorName string) (*PID, error) // RemoteActor returns the address of a remote actor when cluster is enabled // When the cluster mode is not enabled an actor not found error will be returned // One can always check whether cluster is enabled before calling this method or just use the ActorOf method. RemoteActor(ctx context.Context, actorName string) (addr *address.Address, err error) // ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled // When cluster mode is activated, the PID will be nil. // When remoting is enabled this method will return and error // An actor not found error is return when the actor is not found. ActorOf(ctx context.Context, actorName string) (addr *address.Address, pid *PID, err error) // InCluster states whether the actor system has started within a cluster of nodes InCluster() bool // GetPartition returns the partition where a given actor is located GetPartition(actorName string) int // Subscribe creates an event subscriber to consume events from the actor system. // Remember to use the Unsubscribe method to avoid resource leakage. Subscribe() (eventstream.Subscriber, error) // Unsubscribe unsubscribes a subscriber. Unsubscribe(subscriber eventstream.Subscriber) error // ScheduleOnce schedules a message that will be delivered to the receiver actor // This will send the given message to the actor after the given interval specified. // The message will be sent once ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error // Schedule schedules a message that will be delivered to the receiver actor // This will send the given message to the actor after the given interval specified. Schedule(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error // RemoteScheduleOnce schedules a message to be sent to a remote actor in the future. // This requires remoting to be enabled on the actor system. // This will send the given message to the actor after the given interval specified // The message will be sent once RemoteScheduleOnce(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error // RemoteSchedule schedules a message to be sent to a remote actor in the future. // This requires remoting to be enabled on the actor system. // This will send the given message to the actor after the given interval specified RemoteSchedule(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error // ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error // RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *address.Address, cronExpression string) error // PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system. // This address is empty when cluster mode is not activated PeerAddress() string // Register registers an actor for future use. This is necessary when creating an actor remotely Register(ctx context.Context, actor Actor) error // Deregister removes a registered actor from the registry Deregister(ctx context.Context, actor Actor) error // Logger returns the logger sets when creating the actor system Logger() log.Logger // Host returns the actor system node host address // This is the bind address for remote communication Host() string // Port returns the actor system node port. // This is the bind port for remote communication Port() int // Uptime returns the number of seconds since the actor system started Uptime() int64 // Running returns true when the actor system is running Running() bool // Run starts the actor system, blocks on the signals channel, and then // gracefully shuts the application down. // It's designed to make typical applications simple to run. // All of Run's functionality is implemented in terms of the exported // Start and Stop methods. Applications with more specialized needs // can use those methods directly instead of relying on Run. Run(ctx context.Context, startHook func(ctx context.Context) error, stopHook func(ctx context.Context) error) // contains filtered or unexported methods }
ActorSystem defines the contract of an actor system
func NewActorSystem ¶
func NewActorSystem(name string, opts ...Option) (ActorSystem, error)
NewActorSystem creates an instance of ActorSystem
type BoundedMailbox ¶ added in v2.7.0
type BoundedMailbox struct {
// contains filtered or unexported fields
}
BoundedMailbox defines a bounded mailbox using ring buffer queue This mailbox is thread-safe
func NewBoundedMailbox ¶ added in v2.7.0
func NewBoundedMailbox(capacity int) *BoundedMailbox
NewBoundedMailbox creates a new instance BoundedMailbox
func (*BoundedMailbox) Dequeue ¶ added in v2.7.0
func (mailbox *BoundedMailbox) Dequeue() (msg *ReceiveContext)
Dequeue takes the mail from the mailbox It returns nil when the mailbox is empty
func (*BoundedMailbox) Dispose ¶ added in v2.10.1
func (mailbox *BoundedMailbox) Dispose()
Dispose will dispose of this queue and free any blocked threads in the Enqueue and/or Dequeue methods.
func (*BoundedMailbox) Enqueue ¶ added in v2.7.0
func (mailbox *BoundedMailbox) Enqueue(msg *ReceiveContext) error
Enqueue places the given value in the mailbox This will return an error when the mailbox is full
func (*BoundedMailbox) IsEmpty ¶ added in v2.7.0
func (mailbox *BoundedMailbox) IsEmpty() bool
IsEmpty returns true when the mailbox is empty
func (*BoundedMailbox) Len ¶ added in v2.7.0
func (mailbox *BoundedMailbox) Len() int64
Len returns queue length
type ClusterConfig ¶ added in v2.1.0
type ClusterConfig struct {
// contains filtered or unexported fields
}
ClusterConfig defines the cluster mode settings
func NewClusterConfig ¶ added in v2.1.0
func NewClusterConfig() *ClusterConfig
NewClusterConfig creates an instance of ClusterConfig
func (*ClusterConfig) Discovery ¶ added in v2.1.0
func (x *ClusterConfig) Discovery() discovery.Provider
Discovery returns the discovery provider
func (*ClusterConfig) DiscoveryPort ¶ added in v2.6.2
func (x *ClusterConfig) DiscoveryPort() int
DiscoveryPort returns the discovery port
func (*ClusterConfig) KVStoreSize ¶ added in v2.12.0
func (x *ClusterConfig) KVStoreSize() uint64
KVStoreSize returns the cluster storage size
func (*ClusterConfig) Kinds ¶ added in v2.1.0
func (x *ClusterConfig) Kinds() []Actor
Kinds returns the actor kinds
func (*ClusterConfig) MinimumPeersQuorum ¶ added in v2.1.0
func (x *ClusterConfig) MinimumPeersQuorum() uint32
MinimumPeersQuorum returns the minimum peers quorum
func (*ClusterConfig) PartitionCount ¶ added in v2.1.0
func (x *ClusterConfig) PartitionCount() uint64
PartitionCount returns the partition count
func (*ClusterConfig) PeersPort ¶ added in v2.1.0
func (x *ClusterConfig) PeersPort() int
PeersPort returns the peers port
func (*ClusterConfig) ReadQuorum ¶ added in v2.11.0
func (x *ClusterConfig) ReadQuorum() uint32
ReadQuorum returns the read quorum
func (*ClusterConfig) ReplicaCount ¶ added in v2.1.0
func (x *ClusterConfig) ReplicaCount() uint32
ReplicaCount returns the replica count.
func (*ClusterConfig) Validate ¶ added in v2.1.0
func (x *ClusterConfig) Validate() error
Validate validates the cluster config
func (*ClusterConfig) WAL ¶ added in v2.13.0
func (x *ClusterConfig) WAL() string
WAL returns the WAL directory
func (*ClusterConfig) WithDiscovery ¶ added in v2.1.0
func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig
WithDiscovery sets the cluster discovery provider
func (*ClusterConfig) WithDiscoveryPort ¶ added in v2.6.2
func (x *ClusterConfig) WithDiscoveryPort(port int) *ClusterConfig
WithDiscoveryPort sets the discovery port
func (*ClusterConfig) WithKVStoreSize ¶ added in v2.12.0
func (x *ClusterConfig) WithKVStoreSize(size uint64) *ClusterConfig
WithKVStoreSize sets the key/value in-memory storage size The default values is 20MB
func (*ClusterConfig) WithKinds ¶ added in v2.1.0
func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig
WithKinds sets the cluster actor kinds
func (*ClusterConfig) WithMinimumPeersQuorum ¶ added in v2.1.0
func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig
WithMinimumPeersQuorum sets the cluster config minimum peers quorum
func (*ClusterConfig) WithPartitionCount ¶ added in v2.1.0
func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig
WithPartitionCount sets the cluster config partition count Partition cound should be a prime number. ref: https://medium.com/swlh/why-should-the-length-of-your-hash-table-be-a-prime-number-760ec65a75d1
func (*ClusterConfig) WithPeersPort ¶ added in v2.1.0
func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig
WithPeersPort sets the peers port
func (*ClusterConfig) WithReadQuorum ¶ added in v2.11.0
func (x *ClusterConfig) WithReadQuorum(count uint32) *ClusterConfig
WithReadQuorum sets the read quorum Note: set this field means you have some advanced knowledge on quorum-based replica control The default value should be sufficient for most use cases
func (*ClusterConfig) WithReplicaCount ¶ added in v2.1.0
func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig
WithReplicaCount sets the cluster replica count. Note: set this field means you have some advanced knowledge on quorum-based replica control
func (*ClusterConfig) WithWAL ¶ added in v2.13.0
func (x *ClusterConfig) WithWAL(dir string) *ClusterConfig
WithWAL sets a custom WAL directory. GoAkt is required to have the permission to create this directory.
func (*ClusterConfig) WithWriteQuorum ¶ added in v2.11.0
func (x *ClusterConfig) WithWriteQuorum(count uint32) *ClusterConfig
WithWriteQuorum sets the write quorum Note: set this field means you have some advanced knowledge on quorum-based replica control The default value should be sufficient for most use cases
func (*ClusterConfig) WriteQuorum ¶ added in v2.11.0
func (x *ClusterConfig) WriteQuorum() uint32
WriteQuorum returns the write quorum
type FuncOption ¶
type FuncOption interface {
// Apply sets the Option value of a config.
Apply(actor *funcConfig)
}
FuncOption is the interface that applies a SpawnHook option.
func WithFuncMailbox ¶ added in v2.7.0
func WithFuncMailbox(mailbox Mailbox) FuncOption
WithFuncMailbox sets the mailbox to use when starting the func-based actor
func WithPostStop ¶
func WithPostStop(fn PostStopFunc) FuncOption
WithPostStop defines the PostStopFunc hook
func WithPreStart ¶
func WithPreStart(fn PreStartFunc) FuncOption
WithPreStart defines the PreStartFunc hook
type InternalError ¶ added in v2.11.0
type InternalError struct {
// contains filtered or unexported fields
}
InternalError defines an error that is explicit to the application
func NewInternalError ¶ added in v2.11.0
func NewInternalError(err error) InternalError
NewInternalError returns an intance of InternalError
func (InternalError) Error ¶ added in v2.11.0
func (i InternalError) Error() string
Error implements the standard error interface
type Mailbox ¶
type Mailbox interface { // Enqueue pushes a message into the mailbox. This returns an error // when the box is full Enqueue(msg *ReceiveContext) error // Dequeue fetches a message from the mailbox Dequeue() (msg *ReceiveContext) // IsEmpty returns true when the mailbox is empty IsEmpty() bool // Len returns the size of the mailbox Len() int64 // Dispose will dispose of this queue and free any blocked threads // in the Enqueue and/or Dequeue methods. Dispose() }
Mailbox defines the actor mailbox. Any implementation should be a thread-safe FIFO
type Metric ¶ added in v2.11.1
type Metric struct {
// contains filtered or unexported fields
}
Metric defines the actor system metric
func (Metric) ActorsCount ¶ added in v2.11.1
ActorsCount returns the total number of actors either in the system or the total number of child actor given a specific PID
func (Metric) DeadlettersCount ¶ added in v2.11.1
DeadlettersCount returns the total number of deadletters
type Option ¶
type Option interface {
// Apply sets the Option value of a config.
Apply(sys *actorSystem)
}
Option is the interface that applies a configuration option.
func WithActorInitMaxRetries ¶
WithActorInitMaxRetries sets the number of times to retry an actor init process
func WithActorInitTimeout ¶
WithActorInitTimeout sets how long in seconds an actor start timeout
func WithCluster ¶ added in v2.1.0
func WithCluster(config *ClusterConfig) Option
WithCluster enables the cluster mode
func WithCoordinatedShutdown ¶ added in v2.11.0
func WithCoordinatedShutdown(hooks ...ShutdownHook) Option
WithCoordinatedShutdown registers internal and user-defined tasks to be executed during the shutdown process. The defined tasks will be executed in the same order of insertion. Any failure will halt the shutdown process.
func WithExpireActorAfter ¶
WithExpireActorAfter sets a custom duration after which an idle actor will be passivated. Passivation allows the actor system to free up resources by stopping actors that have been inactive for the specified duration. If the actor receives a message before this timeout, the passivation timer is reset.
func WithJanitorInterval ¶ added in v2.6.2
WithJanitorInterval sets the janitor interval
func WithLogger ¶
WithLogger sets the actor system custom log
func WithPartitionHasher ¶
WithPartitionHasher sets the partition hasher.
func WithPassivationDisabled ¶
func WithPassivationDisabled() Option
WithPassivationDisabled disable the passivation mode
func WithPeerStateLoopInterval ¶ added in v2.2.1
WithPeerStateLoopInterval sets the peer state loop interval
func WithRemote ¶ added in v2.12.0
WithRemote enables remoting on the actor system
func WithRemoting ¶
WithRemoting enables remoting on the actor system Deprecated: use WithRemote which provides better remoting configuration and optimization
func WithShutdownTimeout ¶
WithShutdownTimeout sets the shutdown timeout The timeout needs to be considerable reasonable based upon the total number of actors the system will probably needs. The graceful timeout is shared amongst all actors and children actors created in the system to graceful shutdown via a cancellation context.
func WithTLS ¶ added in v2.12.0
WithTLS configures TLS settings for both the Server and Client. Ensure that both the Server and Client are configured with the same root Certificate Authority (CA) to enable successful handshake and mutual authentication.
In cluster mode, all nodes must share the same root CA to establish secure communication and complete handshakes successfully.
type OptionFunc ¶
type OptionFunc func(*actorSystem)
OptionFunc implements the Option interface.
func (OptionFunc) Apply ¶
func (f OptionFunc) Apply(c *actorSystem)
type PID ¶
type PID struct {
// contains filtered or unexported fields
}
PID specifies an actor unique process With the PID one can send a ReceiveContext to the actor
var ( // NoSender means that there is no sender NoSender *PID // DefaultSupervisoryStrategy defines the default supervisory strategy DefaultSupervisoryStrategy = NewStopDirective() )
func (*PID) ActorSystem ¶
func (pid *PID) ActorSystem() ActorSystem
ActorSystem returns the actor system
func (*PID) Ask ¶
func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.
func (*PID) BatchAsk ¶
func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message, err error)
BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors.
func (*PID) BatchTell ¶
BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.
func (*PID) Children ¶
Children returns the list of all the direct descendants of the given actor Only alive actors are included in the list or an empty list is returned
func (*PID) ChildrenCount ¶ added in v2.7.0
ChildrenCount returns the total number of childrenMap for the given PID
func (*PID) ID ¶
ID is a convenient method that returns the actor unique identifier An actor unique identifier is its address in the actor system.
func (*PID) IsRunning ¶
IsRunning returns true when the actor is alive ready to process messages and false when the actor is stopped or not started at all
func (*PID) IsSuspended ¶ added in v2.10.0
IsSuspended returns true when the actor is suspended A suspended actor is a faulty actor
func (*PID) LatestProcessedDuration ¶ added in v2.7.0
LatestProcessedDuration returns the duration of the latest message processed
func (*PID) Metric ¶ added in v2.11.1
func (pid *PID) Metric(ctx context.Context) *ActorMetric
Metric returns the actor system metrics. The metric does not include any cluster data
func (*PID) PipeTo ¶ added in v2.2.0
PipeTo processes a long-started task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-started task is completed
func (*PID) ProcessedCount ¶ added in v2.7.0
ProcessedCount returns the total number of messages processed at a given time
func (*PID) RemoteAsk ¶
func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error)
RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func (*PID) RemoteBatchAsk ¶
func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error)
RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.
func (*PID) RemoteBatchTell ¶
func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messages []proto.Message) error
RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.
func (*PID) RemoteLookup ¶
func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)
RemoteLookup look for an actor address on a remote node.
func (*PID) RemoteReSpawn ¶
RemoteReSpawn restarts an actor on a remote node.
func (*PID) RemoteSpawn ¶
func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
func (*PID) RemoteStop ¶
RemoteStop stops an actor on a remote node
func (*PID) RemoteTell ¶
RemoteTell sends a message to an actor remotely without expecting any reply
func (*PID) Restart ¶
Restart restarts the actor. During restart all messages that are in the mailbox and not yet processed will be ignored. Only the direct alive children of the given actor will be shudown and respawned with their initial state. Bear in mind that restarting an actor will reinitialize the actor to initial state. In case any of the direct child restart fails the given actor will not be started at all.
func (*PID) RestartCount ¶ added in v2.7.0
RestartCount returns the total number of re-starts by the given PID
func (*PID) SendAsync ¶ added in v2.6.2
SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.
func (*PID) SendSync ¶ added in v2.6.2
func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error)
SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.
func (*PID) Shutdown ¶
Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.
func (*PID) SpawnChild ¶
func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
SpawnChild creates a child actor and start watching it for error When the given child actor already exists its PID will only be returned
func (*PID) Stop ¶
Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped.
type PanicError ¶ added in v2.10.0
type PanicError struct {
// contains filtered or unexported fields
}
PanicError defines the panic error wrapping the underlying error
func NewPanicError ¶ added in v2.10.0
func NewPanicError(err error) PanicError
NewPanicError creates an instance of PanicError
func (PanicError) Error ¶ added in v2.10.0
func (e PanicError) Error() string
Error implements the standard error interface
type PostStopFunc ¶
PostStopFunc defines the PostStopFunc hook for an actor creation
type PreStartFunc ¶
PreStartFunc defines the PreStartFunc hook for an actor creation
type PriorityFunc ¶ added in v2.10.0
PriorityFunc defines the priority function that will help determines the priority of two messages
type ReceiveContext ¶
type ReceiveContext struct {
// contains filtered or unexported fields
}
ReceiveContext is the context that is used by the actor to receive messages
func (*ReceiveContext) ActorSystem ¶ added in v2.8.2
func (rctx *ReceiveContext) ActorSystem() ActorSystem
ActorSystem returns the actor system
func (*ReceiveContext) Ask ¶
func (rctx *ReceiveContext) Ask(to *PID, message proto.Message, timeout time.Duration) (response proto.Message)
Ask sends a synchronous message to another actor and expect a response. This method is good when interacting with a child actor. Ask has a timeout which can cause the sender to set the context error. When ask times out, the receiving actor does not know and may still process the message. It is recommended to set a good timeout to quickly receive response and try to avoid false positives
func (*ReceiveContext) BatchAsk ¶
func (rctx *ReceiveContext) BatchAsk(to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message)
BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.
func (*ReceiveContext) BatchTell ¶
func (rctx *ReceiveContext) BatchTell(to *PID, messages ...proto.Message)
BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.
func (*ReceiveContext) Become ¶
func (rctx *ReceiveContext) Become(behavior Behavior)
Become switch the current behavior of the actor to a new behavior
func (*ReceiveContext) BecomeStacked ¶
func (rctx *ReceiveContext) BecomeStacked(behavior Behavior)
BecomeStacked sets a new behavior to the actor. The current message in process during the transition will still be processed with the current behavior before the transition. However, subsequent messages will be processed with the new behavior. One needs to call UnBecomeStacked to go the next the actor's behavior. which is the default behavior.
func (*ReceiveContext) Child ¶
func (rctx *ReceiveContext) Child(name string) *PID
Child returns the named child actor if it is alive
func (*ReceiveContext) Children ¶
func (rctx *ReceiveContext) Children() []*PID
Children returns the list of all the children of the given actor
func (*ReceiveContext) Context ¶
func (rctx *ReceiveContext) Context() context.Context
Context represents the context attached to the message
func (*ReceiveContext) Err ¶
func (rctx *ReceiveContext) Err(err error)
Err is used instead of panicking within a message handler.
func (*ReceiveContext) Forward ¶
func (rctx *ReceiveContext) Forward(to *PID)
Forward method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This operation does nothing when the receiving actor has not started This method can only be used on a single node actor system because the actor reference PID is needed
func (*ReceiveContext) ForwardTo ¶ added in v2.9.0
func (rctx *ReceiveContext) ForwardTo(actorName string)
ForwardTo method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This operation does nothing when the receiving actor has not started This method is only when the actor system is in the cluster mode because it is location transparent.
func (*ReceiveContext) Logger ¶ added in v2.8.2
func (rctx *ReceiveContext) Logger() log.Logger
Logger returns the logger used in the actor system
func (*ReceiveContext) Message ¶
func (rctx *ReceiveContext) Message() proto.Message
Message is the actual message sent
func (*ReceiveContext) PipeTo ¶ added in v2.2.0
func (rctx *ReceiveContext) PipeTo(to *PID, task future.Task)
PipeTo processes a long-running task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-running task is completed
func (*ReceiveContext) RemoteAsk ¶
func (rctx *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any)
RemoteAsk is used to send a message to an actor remotely and expect a response immediately.
func (*ReceiveContext) RemoteBatchAsk ¶
func (rctx *ReceiveContext) RemoteBatchAsk(to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any)
RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.
func (*ReceiveContext) RemoteBatchTell ¶
func (rctx *ReceiveContext) RemoteBatchTell(to *address.Address, messages []proto.Message)
RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.
func (*ReceiveContext) RemoteForward ¶ added in v2.9.0
func (rctx *ReceiveContext) RemoteForward(to *address.Address)
RemoteForward method works similarly to the RemoteTell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This method can only be used when remoting is enabled on the running actor system
func (*ReceiveContext) RemoteLookup ¶
func (rctx *ReceiveContext) RemoteLookup(host string, port int, name string) (addr *goaktpb.Address)
RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done using the same actor system as the PID actor system
func (*ReceiveContext) RemoteReSpawn ¶
func (rctx *ReceiveContext) RemoteReSpawn(host string, port int, name string)
RemoteReSpawn restarts an actor on a remote node.
func (*ReceiveContext) RemoteSender ¶
func (rctx *ReceiveContext) RemoteSender() *address.Address
RemoteSender defines the remote sender of the message if it is a remote message This is set to NoSender when the message is not a remote message
func (*ReceiveContext) RemoteTell ¶
func (rctx *ReceiveContext) RemoteTell(to *address.Address, message proto.Message)
RemoteTell sends a message to an actor remotely without expecting any reply
func (*ReceiveContext) Response ¶
func (rctx *ReceiveContext) Response(resp proto.Message)
Response sets the message response
func (*ReceiveContext) Self ¶
func (rctx *ReceiveContext) Self() *PID
Self returns the receiver PID of the message
func (*ReceiveContext) SendAsync ¶ added in v2.6.2
func (rctx *ReceiveContext) SendAsync(actorName string, message proto.Message)
SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.
func (*ReceiveContext) SendSync ¶ added in v2.6.2
func (rctx *ReceiveContext) SendSync(actorName string, message proto.Message, timeout time.Duration) (response proto.Message)
SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.
func (*ReceiveContext) Shutdown ¶
func (rctx *ReceiveContext) Shutdown()
Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.
func (*ReceiveContext) Spawn ¶
func (rctx *ReceiveContext) Spawn(name string, actor Actor, opts ...SpawnOption) *PID
Spawn creates a child actor or return error
func (*ReceiveContext) Stash ¶
func (rctx *ReceiveContext) Stash()
Stash enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior
func (*ReceiveContext) Stop ¶
func (rctx *ReceiveContext) Stop(child *PID)
Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped. However, it returns an error when the child cannot be stopped.
func (*ReceiveContext) Tell ¶
func (rctx *ReceiveContext) Tell(to *PID, message proto.Message)
Tell sends an asynchronous message to another PID
func (*ReceiveContext) UnBecome ¶
func (rctx *ReceiveContext) UnBecome()
UnBecome reset the actor behavior to the default one
func (*ReceiveContext) UnBecomeStacked ¶
func (rctx *ReceiveContext) UnBecomeStacked()
UnBecomeStacked sets the actor behavior to the next behavior before BecomeStacked was called
func (*ReceiveContext) UnWatch ¶ added in v2.8.2
func (rctx *ReceiveContext) UnWatch(cid *PID)
UnWatch stops watching a given actor
func (*ReceiveContext) Unhandled ¶
func (rctx *ReceiveContext) Unhandled()
Unhandled is used to handle unhandled messages instead of throwing error
func (*ReceiveContext) Unstash ¶
func (rctx *ReceiveContext) Unstash()
Unstash unstashes the oldest message in the stash and prepends to the mailbox
func (*ReceiveContext) UnstashAll ¶
func (rctx *ReceiveContext) UnstashAll()
UnstashAll unstashes all messages from the stash buffer and prepends in the mailbox it keeps the messages in the same order as received, unstashing older messages before newer
func (*ReceiveContext) Watch ¶ added in v2.8.2
func (rctx *ReceiveContext) Watch(cid *PID)
Watch watches a given actor for a Terminated message when the watched actor shutdown
type ReceiveFunc ¶
ReceiveFunc is a message handling placeholder
type Remoting ¶ added in v2.8.0
type Remoting struct {
// contains filtered or unexported fields
}
Remoting defines the Remoting APIs This requires Remoting is enabled on the connected actor system
func NewRemoting ¶ added in v2.8.0
func NewRemoting(opts ...RemotingOption) *Remoting
NewRemoting creates an instance Remoting with an insecure connection. To use a secure connection one need to call the WithTLS method of the remoting instance to set the certificates of the secure connection This requires Remoting is enabled on the connected actor system Make sure to call Close to free up resources otherwise you may be leaking socket connections
One can also override the remoting option when calling any of the method for custom one.
func (*Remoting) Close ¶ added in v2.8.0
func (r *Remoting) Close()
Close closes the serviceClient connection
func (*Remoting) RemoteAsk ¶ added in v2.8.0
func (r *Remoting) RemoteAsk(ctx context.Context, from, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error)
RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func (*Remoting) RemoteBatchAsk ¶ added in v2.8.0
func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error)
RemoteBatchAsk sends bulk messages to an actor with responses expected
func (*Remoting) RemoteBatchTell ¶ added in v2.8.0
func (r *Remoting) RemoteBatchTell(ctx context.Context, from, to *address.Address, messages []proto.Message) error
RemoteBatchTell sends bulk asynchronous messages to an actor
func (*Remoting) RemoteLookup ¶ added in v2.8.0
func (r *Remoting) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *address.Address, err error)
RemoteLookup look for an actor address on a remote node.
func (*Remoting) RemoteReSpawn ¶ added in v2.8.0
RemoteReSpawn restarts actor on a remote node.
func (*Remoting) RemoteSpawn ¶ added in v2.8.0
func (r *Remoting) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
func (*Remoting) RemoteStop ¶ added in v2.8.0
RemoteStop stops an actor on a remote node.
type RemotingOption ¶ added in v2.12.0
type RemotingOption func(*Remoting)
RemotingOption sets the remoting option
func WithRemotingTLS ¶ added in v2.12.0
func WithRemotingTLS(tlsConfig *tls.Config) RemotingOption
WithRemotingTLS configures the remoting system to use a secure connection for communication with the specified remote node. This requires a TLS client configuration to enable secure interactions with the remote actor system.
Ensure that the remote actor system is configured with TLS enabled and capable of completing a successful handshake. It is recommended that both systems share the same root Certificate Authority (CA) for mutual trust and secure communication.
type RestartDirective ¶
type RestartDirective struct {
// contains filtered or unexported fields
}
RestartDirective defines the supervisor restart directive
func NewRestartDirective ¶ added in v2.4.0
func NewRestartDirective() *RestartDirective
NewRestartDirective creates an instance of RestartDirective
func (*RestartDirective) MaxNumRetries ¶ added in v2.4.0
func (x *RestartDirective) MaxNumRetries() uint32
MaxNumRetries returns the max num retries
func (*RestartDirective) Timeout ¶ added in v2.4.0
func (x *RestartDirective) Timeout() time.Duration
Timeout returns the timeout
type ResumeDirective ¶ added in v2.4.0
type ResumeDirective struct{}
ResumeDirective defines the supervisor resume directive This ignores the failure and process the next message, instead
func NewResumeDirective ¶ added in v2.4.0
func NewResumeDirective() *ResumeDirective
NewResumeDirective creates an instance of ResumeDirective
type RouterOption ¶ added in v2.4.0
type RouterOption interface {
// Apply sets the Option value of a config.
Apply(cl *router)
}
RouterOption is the interface that applies a configuration option.
func WithRoutingStrategy ¶ added in v2.4.0
func WithRoutingStrategy(strategy RoutingStrategy) RouterOption
WithRoutingStrategy sets the routing strategy
type RouterOptionFunc ¶ added in v2.4.0
type RouterOptionFunc func(router *router)
RouterOptionFunc implements the Option interface.
func (RouterOptionFunc) Apply ¶ added in v2.4.0
func (f RouterOptionFunc) Apply(c *router)
type RoutingStrategy ¶ added in v2.4.0
type RoutingStrategy int
RoutingStrategy defines the routing strategy to use when defining routers
const ( RoundRobinRouting RoutingStrategy = iota RandomRouting FanOutRouting )
type ShutdownHook ¶ added in v2.11.0
ShutdownHook defines the shutdown hook to be executed alongside the termination of the actor system
type SpawnError ¶ added in v2.11.0
type SpawnError struct {
// contains filtered or unexported fields
}
SpawnError defines an error when re/creating an actor
func NewSpawnError ¶ added in v2.11.0
func NewSpawnError(err error) SpawnError
NewSpawnError returns an instance of SpawnError
func (SpawnError) Error ¶ added in v2.11.0
func (s SpawnError) Error() string
Error implements the standard error interface
type SpawnOption ¶ added in v2.7.0
type SpawnOption interface {
// Apply sets the Option value of a config.
Apply(config *spawnConfig)
}
SpawnOption is the interface that applies to
func WithLongLived ¶ added in v2.13.0
func WithLongLived() SpawnOption
WithLongLived ensures that the given actor, once created, will persist for the entire lifespan of the running actor system. Unlike short-lived actors that may be restarted or garbage-collected, a long-lived actor remains active until the actor system itself shuts down.
func WithMailbox ¶
func WithMailbox(mailbox Mailbox) SpawnOption
WithMailbox sets the mailbox to use when starting the given actor Care should be taken when using a specific mailbox for a given actor on how to handle messages particularly when it comes to priority mailbox
func WithPassivateAfter ¶ added in v2.13.0
func WithPassivateAfter(after time.Duration) SpawnOption
WithPassivateAfter sets a custom duration after which an idle actor will be passivated. Passivation allows the actor system to free up resources by stopping actors that have been inactive for the specified duration. If the actor receives a message before this timeout, the passivation timer is reset.
func WithSupervisorStrategies ¶ added in v2.10.0
func WithSupervisorStrategies(supervisorStrategies ...*SupervisorStrategy) SpawnOption
WithSupervisorStrategies defines the supervisor strategies to apply when the given actor fails or panics during its messages processing
type StopDirective ¶
type StopDirective struct{}
StopDirective defines the supervisor stop directive
func NewStopDirective ¶ added in v2.4.0
func NewStopDirective() *StopDirective
NewStopDirective creates an instance of StopDirective
type SupervisorDirective ¶ added in v2.4.0
type SupervisorDirective interface {
// contains filtered or unexported methods
}
SupervisorDirective defines the supervisorQA directive
type SupervisorStrategy ¶ added in v2.10.0
type SupervisorStrategy struct {
// contains filtered or unexported fields
}
SupervisorStrategy defines the rules to apply to a faulty actor during message processing
func NewSupervisorStrategy ¶ added in v2.10.0
func NewSupervisorStrategy(err error, directive SupervisorDirective) *SupervisorStrategy
NewSupervisorStrategy creates an instance of SupervisorStrategy
func (*SupervisorStrategy) Directive ¶ added in v2.10.0
func (s *SupervisorStrategy) Directive() SupervisorDirective
Directive returns the directive of the supervisor strategy
func (*SupervisorStrategy) Error ¶ added in v2.10.0
func (s *SupervisorStrategy) Error() error
Kind returns the error type of the supervisor strategy
type TLSInfo ¶ added in v2.12.0
type TLSInfo struct { // ClientConfig defines the client TLS config ClientConfig *tls.Config // ServerConfig defines the server TLS config ServerConfig *tls.Config }
TLSInfo defines the TLS settings Ensure that both the Server and Client are configured with the same root Certificate Authority (CA) to enable successful handshake and mutual authentication.
type UnboundedMailbox ¶ added in v2.7.0
type UnboundedMailbox struct {
// contains filtered or unexported fields
}
UnboundedMailbox is a Multi-Producer-Single-Consumer Queue (FIFO) reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html
func NewUnboundedMailbox ¶ added in v2.7.0
func NewUnboundedMailbox() *UnboundedMailbox
NewUnboundedMailbox create an instance of UnboundedMailbox
func (*UnboundedMailbox) Dequeue ¶ added in v2.7.0
func (m *UnboundedMailbox) Dequeue() *ReceiveContext
Dequeue takes the mail from the mailbox Returns nil if the mailbox is empty. Can be used in a single consumer (goroutine) only.
func (*UnboundedMailbox) Dispose ¶ added in v2.10.1
func (m *UnboundedMailbox) Dispose()
Dispose will dispose of this queue and free any blocked threads in the Enqueue and/or Dequeue methods.
func (*UnboundedMailbox) Enqueue ¶ added in v2.7.0
func (m *UnboundedMailbox) Enqueue(value *ReceiveContext) error
Enqueue places the given value in the mailbox
func (*UnboundedMailbox) IsEmpty ¶ added in v2.7.0
func (m *UnboundedMailbox) IsEmpty() bool
IsEmpty returns true when the mailbox is empty
func (*UnboundedMailbox) Len ¶ added in v2.7.0
func (m *UnboundedMailbox) Len() int64
Len returns mailbox length
type UnboundedPriorityMailBox ¶ added in v2.10.0
type UnboundedPriorityMailBox struct {
// contains filtered or unexported fields
}
UnboundedPriorityMailBox is a Priority Queue (FIFO) It implements a binary heap (using the standard library container/heap)
func NewUnboundedPriorityMailBox ¶ added in v2.10.0
func NewUnboundedPriorityMailBox(priorityFunc PriorityFunc) *UnboundedPriorityMailBox
NewUnboundedPriorityMailBox creates an instance of UnboundedPriorityMailBox
func (*UnboundedPriorityMailBox) Dequeue ¶ added in v2.10.0
func (q *UnboundedPriorityMailBox) Dequeue() (msg *ReceiveContext)
Dequeue takes the mail from the mailbox based upon the priority function defined when initializing the mailbox
func (*UnboundedPriorityMailBox) Dispose ¶ added in v2.10.1
func (q *UnboundedPriorityMailBox) Dispose()
Dispose will dispose of this queue and free any blocked threads in the Enqueue and/or Dequeue methods.
func (*UnboundedPriorityMailBox) Enqueue ¶ added in v2.10.0
func (q *UnboundedPriorityMailBox) Enqueue(msg *ReceiveContext) error
Enqueue places the given value in the mailbox The given message must be a priority message otherwise an error will be returned
func (*UnboundedPriorityMailBox) IsEmpty ¶ added in v2.10.0
func (q *UnboundedPriorityMailBox) IsEmpty() bool
IsEmpty returns true when the mailbox is empty
func (*UnboundedPriorityMailBox) Len ¶ added in v2.10.0
func (q *UnboundedPriorityMailBox) Len() int64
Len returns mailbox length
Source Files
¶
- actor.go
- actor_ref.go
- actor_system.go
- api.go
- behavior_stack.go
- bounded_mailbox.go
- cluster_config.go
- cluster_store.go
- deadletters.go
- errors.go
- func_actor.go
- go_scheduler.go
- janitor.go
- mailbox.go
- metric.go
- option.go
- pid.go
- pid_node.go
- pid_option.go
- pid_tree.go
- rebalancer.go
- receive_context.go
- reflection.go
- remoting.go
- root_guardian.go
- router.go
- scheduler.go
- sharded_map.go
- spawn_option.go
- stash.go
- supervisor_directive.go
- supervisor_strategy.go
- system_guardian.go
- tls_info.go
- unbounded_mailbox.go
- unbounded_priority_mailbox.go
- user_guardian.go
- util.go