Documentation ¶
Index ¶
- Constants
- Variables
- func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
- func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error)
- func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error
- func RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, ...) (response *anypb.Any, err error)
- func RemoteBatchAsk(ctx context.Context, to *address.Address, messages ...proto.Message) (responses []*anypb.Any, err error)
- func RemoteBatchTell(ctx context.Context, to *address.Address, messages ...proto.Message) error
- func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *address.Address, err error)
- func RemoteReSpawn(ctx context.Context, host string, port int, name string) error
- func RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
- func RemoteStop(ctx context.Context, host string, port int, name string) error
- func RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error
- func Tell(ctx context.Context, to *PID, message proto.Message) error
- type Actor
- type ActorSystem
- type Behavior
- type BoundedMailbox
- type ClusterConfig
- func (x *ClusterConfig) Discovery() discovery.Provider
- func (x *ClusterConfig) DiscoveryPort() int
- func (x *ClusterConfig) Kinds() []Actor
- func (x *ClusterConfig) MinimumPeersQuorum() uint32
- func (x *ClusterConfig) PartitionCount() uint64
- func (x *ClusterConfig) PeersPort() int
- func (x *ClusterConfig) ReplicaCount() uint32
- func (x *ClusterConfig) Validate() error
- func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig
- func (x *ClusterConfig) WithDiscoveryPort(port int) *ClusterConfig
- func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig
- func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig
- func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig
- func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig
- func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig
- type FuncOption
- type Mailbox
- type Option
- func WithActorInitMaxRetries(max int) Option
- func WithActorInitTimeout(timeout time.Duration) Option
- func WithCluster(config *ClusterConfig) Option
- func WithClustering(provider discovery.Provider, partitionCount uint64, minimumPeersQuorum uint16, ...) Option
- func WithExpireActorAfter(duration time.Duration) Option
- func WithJanitorInterval(interval time.Duration) Option
- func WithLogger(logger log.Logger) Option
- func WithPartitionHasher(hasher hash.Hasher) Option
- func WithPassivationDisabled() Option
- func WithPeerStateLoopInterval(interval time.Duration) Option
- func WithRemoting(host string, port int32) Option
- func WithReplyTimeout(timeout time.Duration) Option
- func WithShutdownTimeout(timeout time.Duration) Option
- func WithStash() Option
- func WithSupervisorDirective(directive SupervisorDirective) Option
- type OptionFunc
- type PID
- func (pid *PID) Actor() Actor
- func (pid *PID) ActorSystem() ActorSystem
- func (pid *PID) Address() *address.Address
- func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message) (response proto.Message, err error)
- func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message) (responses chan proto.Message, err error)
- func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error
- func (pid *PID) Child(name string) (*PID, error)
- func (pid *PID) Children() []*PID
- func (pid *PID) ChildrenCount() int
- func (pid *PID) Equals(to *PID) bool
- func (pid *PID) ID() string
- func (pid *PID) IsRunning() bool
- func (pid *PID) LatestProcessedDuration() time.Duration
- func (pid *PID) Logger() log.Logger
- func (pid *PID) Name() string
- func (pid *PID) Parents() []*PID
- func (pid *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error
- func (pid *PID) ProcessedCount() int
- func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message) (response *anypb.Any, err error)
- func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages ...proto.Message) (responses []*anypb.Any, err error)
- func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messages ...proto.Message) error
- func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)
- func (pid *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) error
- func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
- func (pid *PID) RemoteStop(ctx context.Context, host string, port int, name string) error
- func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error
- func (pid *PID) Restart(ctx context.Context) error
- func (pid *PID) RestartCount() int
- func (pid *PID) SendAsync(ctx context.Context, actorName string, message proto.Message) error
- func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message) (response proto.Message, err error)
- func (pid *PID) Shutdown(ctx context.Context) error
- func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
- func (pid *PID) StashSize() uint64
- func (pid *PID) Stop(ctx context.Context, cid *PID) error
- func (pid *PID) Tell(ctx context.Context, to *PID, message proto.Message) error
- func (pid *PID) UnWatch(cid *PID)
- func (pid *PID) Watch(cid *PID)
- type PostStopFunc
- type PreStartFunc
- type ReceiveContext
- func (c *ReceiveContext) Ask(to *PID, message proto.Message) (response proto.Message)
- func (c *ReceiveContext) BatchAsk(to *PID, messages ...proto.Message) (responses chan proto.Message)
- func (c *ReceiveContext) BatchTell(to *PID, messages ...proto.Message)
- func (c *ReceiveContext) Become(behavior Behavior)
- func (c *ReceiveContext) BecomeStacked(behavior Behavior)
- func (c *ReceiveContext) Child(name string) *PID
- func (c *ReceiveContext) Children() []*PID
- func (c *ReceiveContext) Context() context.Context
- func (c *ReceiveContext) Err(err error)
- func (c *ReceiveContext) Forward(to *PID)
- func (c *ReceiveContext) Message() proto.Message
- func (c *ReceiveContext) PipeTo(to *PID, task future.Task)
- func (c *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message) (response *anypb.Any)
- func (c *ReceiveContext) RemoteBatchAsk(to *address.Address, messages ...proto.Message) (responses []*anypb.Any)
- func (c *ReceiveContext) RemoteBatchTell(to *address.Address, messages ...proto.Message)
- func (c *ReceiveContext) RemoteLookup(host string, port int, name string) (addr *goaktpb.Address)
- func (c *ReceiveContext) RemoteReSpawn(host string, port int, name string)
- func (c *ReceiveContext) RemoteSender() *address.Address
- func (c *ReceiveContext) RemoteTell(to *address.Address, message proto.Message)
- func (c *ReceiveContext) Response(resp proto.Message)
- func (c *ReceiveContext) Self() *PID
- func (c *ReceiveContext) SendAsync(actorName string, message proto.Message)
- func (c *ReceiveContext) SendSync(actorName string, message proto.Message) (response proto.Message)
- func (c *ReceiveContext) Sender() *PID
- func (c *ReceiveContext) Shutdown()
- func (c *ReceiveContext) Spawn(name string, actor Actor, opts ...SpawnOption) *PID
- func (c *ReceiveContext) Stash()
- func (c *ReceiveContext) Stop(child *PID)
- func (c *ReceiveContext) Tell(to *PID, message proto.Message)
- func (c *ReceiveContext) UnBecome()
- func (c *ReceiveContext) UnBecomeStacked()
- func (c *ReceiveContext) Unhandled()
- func (c *ReceiveContext) Unstash()
- func (c *ReceiveContext) UnstashAll()
- type ReceiveFunc
- type RestartDirective
- type ResumeDirective
- type RouterOption
- type RouterOptionFunc
- type RoutingStrategy
- type SpawnOption
- type StopDirective
- type SupervisorDirective
- type UnboundedMailbox
Constants ¶
const ( // DefaultPassivationTimeout defines the default passivation timeout DefaultPassivationTimeout = 2 * time.Minute // DefaultAskTimeout defines the default Ask timeout DefaultAskTimeout = 20 * time.Second // DefaultInitMaxRetries defines the default value for retrying actor initialization DefaultInitMaxRetries = 5 // DefaultShutdownTimeout defines the default shutdown timeout DefaultShutdownTimeout = 2 * time.Second // DefaultInitTimeout defines the default init timeout DefaultInitTimeout = time.Second // DefaultPeerStateLoopInterval defines the default peer state loop interval DefaultPeerStateLoopInterval = 10 * time.Second // DefaultJanitorInterval defines the default GC interval // This helps cleanup dead actors from the given actor system DefaultJanitorInterval = 30 * time.Millisecond )
Variables ¶
var ( // ErrInvalidActorSystemName is returned when the actor system name is invalid ErrInvalidActorSystemName = errors.New("invalid ActorSystem name, must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')") // ErrDead means that the given actor is not alive ErrDead = errors.New("actor is not alive") // ErrUnhandled is used when an actor can handle a given message ErrUnhandled = errors.New("unhandled message") // ErrClusterDisabled is returned when cluster mode is not enabled while accessing cluster features ErrClusterDisabled = errors.New("cluster is not enabled") // ErrUndefinedActor is returned when an actor is defined ErrUndefinedActor = errors.New("actor is not defined") // ErrRequestTimeout is returned when sending an Ask message times out ErrRequestTimeout = errors.New("request timed out") // ErrRemotingDisabled is returned when remoting is not enabled ErrRemotingDisabled = errors.New("remoting is not enabled") // ErrAddressNotFound is returned when an actor address is not found ErrAddressNotFound = func(addr string) error { return connect.NewError(connect.CodeNotFound, fmt.Errorf("actor=%s not found", addr)) } // ErrRemoteSendFailure is returned when remote message fails ErrRemoteSendFailure = func(err error) error { return connect.NewError(connect.CodeInternal, err) } // ErrNameRequired is used when the actor system name is required ErrNameRequired = errors.New("actor system is required") // ErrInvalidInstance is returned when the creation of an actor instance fails ErrInvalidInstance = errors.New("failed to create instance. Reason: invalid instance") // ErrActorNotFound is returned when an actor is not found ErrActorNotFound = func(actorPath string) error { return fmt.Errorf("actor=%s not found", actorPath) } // ErrMethodCallNotAllowed is returned when rpc call is not allowed ErrMethodCallNotAllowed = errors.New("method call not allowed") // ErrInvalidRemoteMessage is returned when an invalid remote message is sent ErrInvalidRemoteMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) } // ErrStashBufferNotSet when stashing is not set while requesting for messages to be stashed ErrStashBufferNotSet = errors.New("actor is not setup with a stash buffer") // ErrInitFailure is returned when the initialization of an actor fails. ErrInitFailure = func(err error) error { return fmt.Errorf("failed to initialize: %w", err) } // ErrActorSystemNotStarted is returned when the actor is not started while accessing the features of the actor system ErrActorSystemNotStarted = errors.New("actor system has not started yet") // ErrLocalAddress is returned when a remote address is used instead of a local address ErrLocalAddress = errors.New("address is a local address") // ErrInstanceNotAnActor is returned when we failed to create the instance of an actor ErrInstanceNotAnActor = errors.New("failed to create instance. Reason: instance does not implement the Actor interface") // ErrTypeNotRegistered is returned when a given actor is not registered ErrTypeNotRegistered = errors.New("actor type is not registered") // ErrPeerNotFound is returned when locating a given peer ErrPeerNotFound = errors.New("peer is not found") // ErrUndefinedTask is returned when piping a long-running task result to an actor ErrUndefinedTask = errors.New("task is not defined") // ErrInvalidHost is returned when a request is sent to an invalid host ErrInvalidHost = errors.New("invalid host") // ErrFullMailbox is returned when the mailbox is full ErrFullMailbox = errors.New("mailbox is full") )
Functions ¶
func Ask ¶
func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.
func BatchAsk ¶
func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error)
BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.
func BatchTell ¶
BatchTell sends bulk asynchronous messages to an actor The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.
func RemoteAsk ¶
func RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error)
RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func RemoteBatchAsk ¶
func RemoteBatchAsk(ctx context.Context, to *address.Address, messages ...proto.Message) (responses []*anypb.Any, err error)
RemoteBatchAsk sends bulk messages to an actor with responses expected
func RemoteBatchTell ¶
RemoteBatchTell sends bulk asynchronous messages to an actor
func RemoteLookup ¶
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *address.Address, err error)
RemoteLookup look for an actor address on a remote node.
func RemoteReSpawn ¶
RemoteReSpawn restarts actor on a remote node.
func RemoteSpawn ¶
RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
func RemoteStop ¶
RemoteStop stops an actor on a remote node.
func RemoteTell ¶
RemoteTell sends a message to an actor remotely without expecting any reply
Types ¶
type Actor ¶
type Actor interface { // PreStart pre-starts the actor. This function can be used to set up some database connections // or some sort of initialization before the actor start processing messages // when the initialization failed the actor will not be started. // Use this function to set any fields that will be needed before the actor starts. // This hook helps set the default values needed by any fields of the actor. PreStart(ctx context.Context) error // Receive processes any message dropped into the actor mailbox. // The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously // by config the reply of the message. The latter approach is often used when an external service is communicating to the actor. // One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system. Receive(ctx *ReceiveContext) // PostStop is executed when the actor is shutting down. // The execution happens when every message that have not been processed yet will be processed before the actor shutdowns // This help free-up resources PostStop(ctx context.Context) error }
Actor represents the Actor interface This will be implemented by any user who wants to create an actor Any implementation must immutable which means all fields must be private(unexported). Only make use the PreStart hook to set the initial values.
type ActorSystem ¶
type ActorSystem interface { // Name returns the actor system name Name() string // Actors returns the list of Actors that are alive in the actor system Actors() []*PID // Start starts the actor system Start(ctx context.Context) error // Stop stops the actor system Stop(ctx context.Context) error // Spawn creates an actor in the system and starts it Spawn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error) // SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks // in the given optional options SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) // SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks // in the given optional options SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error) // SpawnRouter creates a new router. One can additionally set the router options. // A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel. // A single actor will only process one message at a time. SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error) // Kill stops a given actor in the system Kill(ctx context.Context, name string) error // ReSpawn recreates a given actor in the system ReSpawn(ctx context.Context, name string) (*PID, error) // NumActors returns the total number of active actors in the system NumActors() uint64 // LocalActor returns the reference of a local actor. // A local actor is an actor that reside on the same node where the given actor system is running LocalActor(actorName string) (*PID, error) // RemoteActor returns the address of a remote actor when cluster is enabled // When the cluster mode is not enabled an actor not found error will be returned // One can always check whether cluster is enabled before calling this method or just use the ActorOf method. RemoteActor(ctx context.Context, actorName string) (addr *address.Address, err error) // ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled // When cluster mode is activated, the PID will be nil. // When remoting is enabled this method will return and error // An actor not found error is return when the actor is not found. ActorOf(ctx context.Context, actorName string) (addr *address.Address, pid *PID, err error) // InCluster states whether the actor system is running within a cluster of nodes InCluster() bool // GetPartition returns the partition where a given actor is located GetPartition(actorName string) uint64 // Subscribe creates an event subscriber. Subscribe() (eventstream.Subscriber, error) // Unsubscribe unsubscribes a subscriber. Unsubscribe(subscriber eventstream.Subscriber) error // ScheduleOnce schedules a message that will be delivered to the receiver actor // This will send the given message to the actor after the given interval specified. // The message will be sent once ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error // RemoteScheduleOnce schedules a message to be sent to a remote actor in the future. // This requires remoting to be enabled on the actor system. // This will send the given message to the actor after the given interval specified // The message will be sent once RemoteScheduleOnce(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error // ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error // RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression. RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *address.Address, cronExpression string) error // PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system. // This address is empty when cluster mode is not activated PeerAddress() string // Register registers an actor for future use. This is necessary when creating an actor remotely Register(ctx context.Context, actor Actor) error // Deregister removes a registered actor from the registry Deregister(ctx context.Context, actor Actor) error // Logger returns the logger sets when creating the actor system Logger() log.Logger // contains filtered or unexported methods }
ActorSystem defines the contract of an actor system
func NewActorSystem ¶
func NewActorSystem(name string, opts ...Option) (ActorSystem, error)
NewActorSystem creates an instance of ActorSystem
type BoundedMailbox ¶ added in v2.7.0
type BoundedMailbox struct {
// contains filtered or unexported fields
}
BoundedMailbox defines a bounded mailbox using ring buffer queue This mailbox is thread-safe
func NewBoundedMailbox ¶ added in v2.7.0
func NewBoundedMailbox(cap int) *BoundedMailbox
NewBoundedMailbox creates a new instance BoundedMailbox
func (*BoundedMailbox) Dequeue ¶ added in v2.7.0
func (mailbox *BoundedMailbox) Dequeue() (msg *ReceiveContext)
Dequeue takes the mail from the mailbox It returns nil when the mailbox is empty
func (*BoundedMailbox) Enqueue ¶ added in v2.7.0
func (mailbox *BoundedMailbox) Enqueue(msg *ReceiveContext) error
Enqueue places the given value in the mailbox This will return an error when the mailbox is full
func (*BoundedMailbox) IsEmpty ¶ added in v2.7.0
func (mailbox *BoundedMailbox) IsEmpty() bool
IsEmpty returns true when the mailbox is empty
func (*BoundedMailbox) Len ¶ added in v2.7.0
func (mailbox *BoundedMailbox) Len() int64
Len returns queue length
type ClusterConfig ¶ added in v2.1.0
type ClusterConfig struct {
// contains filtered or unexported fields
}
ClusterConfig defines the cluster mode settings
func NewClusterConfig ¶ added in v2.1.0
func NewClusterConfig() *ClusterConfig
NewClusterConfig creates an instance of ClusterConfig
func (*ClusterConfig) Discovery ¶ added in v2.1.0
func (x *ClusterConfig) Discovery() discovery.Provider
Discovery returns the discovery provider
func (*ClusterConfig) DiscoveryPort ¶ added in v2.6.2
func (x *ClusterConfig) DiscoveryPort() int
DiscoveryPort returns the discovery port
func (*ClusterConfig) Kinds ¶ added in v2.1.0
func (x *ClusterConfig) Kinds() []Actor
Kinds returns the actor kinds
func (*ClusterConfig) MinimumPeersQuorum ¶ added in v2.1.0
func (x *ClusterConfig) MinimumPeersQuorum() uint32
MinimumPeersQuorum returns the minimum peers quorum
func (*ClusterConfig) PartitionCount ¶ added in v2.1.0
func (x *ClusterConfig) PartitionCount() uint64
PartitionCount returns the partition count
func (*ClusterConfig) PeersPort ¶ added in v2.1.0
func (x *ClusterConfig) PeersPort() int
PeersPort returns the peers port
func (*ClusterConfig) ReplicaCount ¶ added in v2.1.0
func (x *ClusterConfig) ReplicaCount() uint32
ReplicaCount returns the replica count.
func (*ClusterConfig) Validate ¶ added in v2.1.0
func (x *ClusterConfig) Validate() error
Validate validates the cluster config
func (*ClusterConfig) WithDiscovery ¶ added in v2.1.0
func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig
WithDiscovery sets the cluster discovery provider
func (*ClusterConfig) WithDiscoveryPort ¶ added in v2.6.2
func (x *ClusterConfig) WithDiscoveryPort(port int) *ClusterConfig
WithDiscoveryPort sets the discovery port
func (*ClusterConfig) WithKinds ¶ added in v2.1.0
func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig
WithKinds sets the cluster actor kinds
func (*ClusterConfig) WithMinimumPeersQuorum ¶ added in v2.1.0
func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig
WithMinimumPeersQuorum sets the cluster config minimum peers quorum
func (*ClusterConfig) WithPartitionCount ¶ added in v2.1.0
func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig
WithPartitionCount sets the cluster config partition count
func (*ClusterConfig) WithPeersPort ¶ added in v2.1.0
func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig
WithPeersPort sets the peers port
func (*ClusterConfig) WithReplicaCount ¶ added in v2.1.0
func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig
WithReplicaCount sets the cluster replica count.
type FuncOption ¶
type FuncOption interface {
// Apply sets the Option value of a config.
Apply(actor *funcConfig)
}
FuncOption is the interface that applies a SpawnHook option.
func WithFuncMailbox ¶ added in v2.7.0
func WithFuncMailbox(mailbox Mailbox) FuncOption
WithFuncMailbox sets the mailbox to use when starting the func-based actor
func WithPostStop ¶
func WithPostStop(fn PostStopFunc) FuncOption
WithPostStop defines the PostStopFunc hook
func WithPreStart ¶
func WithPreStart(fn PreStartFunc) FuncOption
WithPreStart defines the PreStartFunc hook
type Mailbox ¶
type Mailbox interface { // Enqueue pushes a message into the mailbox. This returns an error // when the box is full Enqueue(msg *ReceiveContext) error // Dequeue fetches a message from the mailbox Dequeue() (msg *ReceiveContext) // IsEmpty returns true when the mailbox is empty IsEmpty() bool // Len returns the size of the mailbox Len() int64 }
Mailbox defines the actor mailbox. Any implementation should be a thread-safe FIFO
type Option ¶
type Option interface {
// Apply sets the Option value of a config.
Apply(sys *actorSystem)
}
Option is the interface that applies a configuration option.
func WithActorInitMaxRetries ¶
WithActorInitMaxRetries sets the number of times to retry an actor init process
func WithActorInitTimeout ¶
WithActorInitTimeout sets how long in seconds an actor start timeout
func WithCluster ¶ added in v2.1.0
func WithCluster(config *ClusterConfig) Option
WithCluster enables the cluster mode
func WithClustering ¶
func WithClustering(provider discovery.Provider, partitionCount uint64, minimumPeersQuorum uint16, discoveryPort, peersPort int, kinds ...Actor) Option
WithClustering enables the cluster mode. Deprecated: use rather WithCluster which offers a fluent api to set cluster configuration
func WithExpireActorAfter ¶
WithExpireActorAfter sets the actor expiry duration. After such duration an idle actor will be expired and removed from the actor system
func WithJanitorInterval ¶ added in v2.6.2
WithJanitorInterval sets the janitor interval
func WithLogger ¶
WithLogger sets the actor system custom log
func WithPartitionHasher ¶
WithPartitionHasher sets the partition hasher.
func WithPassivationDisabled ¶
func WithPassivationDisabled() Option
WithPassivationDisabled disable the passivation mode
func WithPeerStateLoopInterval ¶ added in v2.2.1
WithPeerStateLoopInterval sets the peer state loop interval
func WithRemoting ¶
WithRemoting enables remoting on the actor system
func WithReplyTimeout ¶
WithReplyTimeout sets how long in seconds an actor should reply a command in a receive-reply pattern
func WithShutdownTimeout ¶
WithShutdownTimeout sets the shutdown timeout
func WithSupervisorDirective ¶ added in v2.4.0
func WithSupervisorDirective(directive SupervisorDirective) Option
WithSupervisorDirective sets the supervisor strategy directive
type OptionFunc ¶
type OptionFunc func(*actorSystem)
OptionFunc implements the Option interface.
func (OptionFunc) Apply ¶
func (f OptionFunc) Apply(c *actorSystem)
type PID ¶
type PID struct {
// contains filtered or unexported fields
}
PID specifies an actor unique process With the PID one can send a ReceiveContext to the actor
var ( // NoSender means that there is no sender NoSender *PID // DefaultSupervisoryStrategy defines the default supervisory strategy DefaultSupervisoryStrategy = NewStopDirective() )
func (*PID) ActorSystem ¶
func (pid *PID) ActorSystem() ActorSystem
ActorSystem returns the actor system
func (*PID) Ask ¶
func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message) (response proto.Message, err error)
Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.
func (*PID) BatchAsk ¶
func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message) (responses chan proto.Message, err error)
BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors.
func (*PID) BatchTell ¶
BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.
func (*PID) Children ¶
Children returns the list of all the direct children of the given actor Only alive actors are included in the list or an empty list is returned
func (*PID) ChildrenCount ¶ added in v2.7.0
ChildrenCount returns the total number of children for the given PID
func (*PID) ID ¶
ID is a convenient method that returns the actor unique identifier An actor unique identifier is its address in the actor system.
func (*PID) IsRunning ¶
IsRunning returns true when the actor is alive ready to process messages and false when the actor is stopped or not started at all
func (*PID) LatestProcessedDuration ¶ added in v2.7.0
LatestProcessedDuration returns the duration of the latest message processed
func (*PID) Parents ¶ added in v2.4.0
Parents returns the list of all direct parents of a given actor. Only alive actors are included in the list or an empty list is returned
func (*PID) PipeTo ¶ added in v2.2.0
PipeTo processes a long-running task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-running task is completed
func (*PID) ProcessedCount ¶ added in v2.7.0
ProcessedCount returns the total number of messages processed at a given time
func (*PID) RemoteAsk ¶
func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message) (response *anypb.Any, err error)
RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func (*PID) RemoteBatchAsk ¶
func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages ...proto.Message) (responses []*anypb.Any, err error)
RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.
func (*PID) RemoteBatchTell ¶
func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messages ...proto.Message) error
RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.
func (*PID) RemoteLookup ¶
func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)
RemoteLookup look for an actor address on a remote node.
func (*PID) RemoteReSpawn ¶
RemoteReSpawn restarts an actor on a remote node.
func (*PID) RemoteSpawn ¶
func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
func (*PID) RemoteStop ¶
RemoteStop stops an actor on a remote node
func (*PID) RemoteTell ¶
RemoteTell sends a message to an actor remotely without expecting any reply
func (*PID) Restart ¶
Restart restarts the actor. During restart all messages that are in the mailbox and not yet processed will be ignored
func (*PID) RestartCount ¶ added in v2.7.0
RestartCount returns the total number of re-starts by the given PID
func (*PID) SendAsync ¶ added in v2.6.2
SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.
func (*PID) SendSync ¶ added in v2.6.2
func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message) (response proto.Message, err error)
SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.
func (*PID) Shutdown ¶
Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.
func (*PID) SpawnChild ¶
func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
SpawnChild creates a child actor and start watching it for error When the given child actor already exists its PID will only be returned
func (*PID) Stop ¶
Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped.
type PostStopFunc ¶
PostStopFunc defines the PostStopFunc hook for an actor creation
type PreStartFunc ¶
PreStartFunc defines the PreStartFunc hook for an actor creation
type ReceiveContext ¶
type ReceiveContext struct {
// contains filtered or unexported fields
}
ReceiveContext is the context that is used by the actor to receive messages
func (*ReceiveContext) Ask ¶
Ask sends a synchronous message to another actor and expect a response. This method is good when interacting with a child actor. Ask has a timeout which can cause the sender to set the context error. When ask times out, the receiving actor does not know and may still process the message. It is recommended to set a good timeout to quickly receive response and try to avoid false positives
func (*ReceiveContext) BatchAsk ¶
func (c *ReceiveContext) BatchAsk(to *PID, messages ...proto.Message) (responses chan proto.Message)
BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.
func (*ReceiveContext) BatchTell ¶
func (c *ReceiveContext) BatchTell(to *PID, messages ...proto.Message)
BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.
func (*ReceiveContext) Become ¶
func (c *ReceiveContext) Become(behavior Behavior)
Become switch the current behavior of the actor to a new behavior
func (*ReceiveContext) BecomeStacked ¶
func (c *ReceiveContext) BecomeStacked(behavior Behavior)
BecomeStacked sets a new behavior to the actor. The current message in process during the transition will still be processed with the current behavior before the transition. However, subsequent messages will be processed with the new behavior. One needs to call UnBecomeStacked to go the next the actor's behavior. which is the default behavior.
func (*ReceiveContext) Child ¶
func (c *ReceiveContext) Child(name string) *PID
Child returns the named child actor if it is alive
func (*ReceiveContext) Children ¶
func (c *ReceiveContext) Children() []*PID
Children returns the list of all the children of the given actor
func (*ReceiveContext) Context ¶
func (c *ReceiveContext) Context() context.Context
Context represents the context attached to the message
func (*ReceiveContext) Err ¶
func (c *ReceiveContext) Err(err error)
Err is used instead of panicking within a message handler.
func (*ReceiveContext) Forward ¶
func (c *ReceiveContext) Forward(to *PID)
Forward method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This operation does nothing when the receiving actor is not running
func (*ReceiveContext) Message ¶
func (c *ReceiveContext) Message() proto.Message
Message is the actual message sent
func (*ReceiveContext) PipeTo ¶ added in v2.2.0
func (c *ReceiveContext) PipeTo(to *PID, task future.Task)
PipeTo processes a long-running task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-running task is completed
func (*ReceiveContext) RemoteAsk ¶
func (c *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message) (response *anypb.Any)
RemoteAsk is used to send a message to an actor remotely and expect a response immediately.
func (*ReceiveContext) RemoteBatchAsk ¶
func (c *ReceiveContext) RemoteBatchAsk(to *address.Address, messages ...proto.Message) (responses []*anypb.Any)
RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.
func (*ReceiveContext) RemoteBatchTell ¶
func (c *ReceiveContext) RemoteBatchTell(to *address.Address, messages ...proto.Message)
RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.
func (*ReceiveContext) RemoteLookup ¶
RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done using the same actor system as the PID actor system
func (*ReceiveContext) RemoteReSpawn ¶
func (c *ReceiveContext) RemoteReSpawn(host string, port int, name string)
RemoteReSpawn restarts an actor on a remote node.
func (*ReceiveContext) RemoteSender ¶
func (c *ReceiveContext) RemoteSender() *address.Address
RemoteSender defines the remote sender of the message if it is a remote message This is set to NoSender when the message is not a remote message
func (*ReceiveContext) RemoteTell ¶
func (c *ReceiveContext) RemoteTell(to *address.Address, message proto.Message)
RemoteTell sends a message to an actor remotely without expecting any reply
func (*ReceiveContext) Response ¶
func (c *ReceiveContext) Response(resp proto.Message)
Response sets the message response
func (*ReceiveContext) Self ¶
func (c *ReceiveContext) Self() *PID
Self returns the receiver PID of the message
func (*ReceiveContext) SendAsync ¶ added in v2.6.2
func (c *ReceiveContext) SendAsync(actorName string, message proto.Message)
SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.
func (*ReceiveContext) SendSync ¶ added in v2.6.2
SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.
func (*ReceiveContext) Shutdown ¶
func (c *ReceiveContext) Shutdown()
Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.
func (*ReceiveContext) Spawn ¶
func (c *ReceiveContext) Spawn(name string, actor Actor, opts ...SpawnOption) *PID
Spawn creates a child actor or return error
func (*ReceiveContext) Stash ¶
func (c *ReceiveContext) Stash()
Stash enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior
func (*ReceiveContext) Stop ¶
func (c *ReceiveContext) Stop(child *PID)
Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped. However, it returns an error when the child cannot be stopped.
func (*ReceiveContext) Tell ¶
func (c *ReceiveContext) Tell(to *PID, message proto.Message)
Tell sends an asynchronous message to another PID
func (*ReceiveContext) UnBecome ¶
func (c *ReceiveContext) UnBecome()
UnBecome reset the actor behavior to the default one
func (*ReceiveContext) UnBecomeStacked ¶
func (c *ReceiveContext) UnBecomeStacked()
UnBecomeStacked sets the actor behavior to the next behavior before BecomeStacked was called
func (*ReceiveContext) Unhandled ¶
func (c *ReceiveContext) Unhandled()
Unhandled is used to handle unhandled messages instead of throwing error
func (*ReceiveContext) Unstash ¶
func (c *ReceiveContext) Unstash()
Unstash unstashes the oldest message in the stash and prepends to the mailbox
func (*ReceiveContext) UnstashAll ¶
func (c *ReceiveContext) UnstashAll()
UnstashAll unstashes all messages from the stash buffer and prepends in the mailbox it keeps the messages in the same order as received, unstashing older messages before newer
type ReceiveFunc ¶
ReceiveFunc is a message handling placeholder
type RestartDirective ¶
type RestartDirective struct {
// contains filtered or unexported fields
}
RestartDirective defines testSupervisor restart directive
func NewRestartDirective ¶ added in v2.4.0
func NewRestartDirective() *RestartDirective
NewRestartDirective creates an instance of RestartDirective
func (*RestartDirective) MaxNumRetries ¶ added in v2.4.0
func (x *RestartDirective) MaxNumRetries() uint32
MaxNumRetries returns the max num retries
func (*RestartDirective) Timeout ¶ added in v2.4.0
func (x *RestartDirective) Timeout() time.Duration
Timeout returns the timeout
type ResumeDirective ¶ added in v2.4.0
type ResumeDirective struct{}
ResumeDirective defines the testSupervisor resume directive This ignores the failure and process the next message, instead
func NewResumeDirective ¶ added in v2.4.0
func NewResumeDirective() *ResumeDirective
NewResumeDirective creates an instance of ResumeDirective
type RouterOption ¶ added in v2.4.0
type RouterOption interface {
// Apply sets the Option value of a config.
Apply(cl *router)
}
RouterOption is the interface that applies a configuration option.
func WithRoutingStrategy ¶ added in v2.4.0
func WithRoutingStrategy(strategy RoutingStrategy) RouterOption
WithRoutingStrategy sets the routing strategy
type RouterOptionFunc ¶ added in v2.4.0
type RouterOptionFunc func(router *router)
RouterOptionFunc implements the Option interface.
func (RouterOptionFunc) Apply ¶ added in v2.4.0
func (f RouterOptionFunc) Apply(c *router)
type RoutingStrategy ¶ added in v2.4.0
type RoutingStrategy int
RoutingStrategy defines the routing strategy to use when defining routers
const ( RoundRobinRouting RoutingStrategy = iota RandomRouting FanOutRouting )
type SpawnOption ¶ added in v2.7.0
type SpawnOption interface {
// Apply sets the Option value of a config.
Apply(config *spawnConfig)
}
SpawnOption is the interface that applies to
func WithMailbox ¶
func WithMailbox(mailbox Mailbox) SpawnOption
WithMailbox sets the mailbox to use when starting the given actor
type StopDirective ¶
type StopDirective struct{}
StopDirective defines the testSupervisor stop directive
func NewStopDirective ¶ added in v2.4.0
func NewStopDirective() *StopDirective
NewStopDirective creates an instance of StopDirective
type SupervisorDirective ¶ added in v2.4.0
type SupervisorDirective interface {
// contains filtered or unexported methods
}
SupervisorDirective defines the testSupervisor directive
type UnboundedMailbox ¶ added in v2.7.0
type UnboundedMailbox struct {
// contains filtered or unexported fields
}
UnboundedMailbox is a Multi-Producer-Single-Consumer Queue (FIFO) reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html
func NewUnboundedMailbox ¶ added in v2.7.0
func NewUnboundedMailbox() *UnboundedMailbox
NewUnboundedMailbox create an instance of UnboundedMailbox
func (*UnboundedMailbox) Dequeue ¶ added in v2.7.0
func (m *UnboundedMailbox) Dequeue() *ReceiveContext
Dequeue takes the mail from the mailbox Returns nil if the queue is empty. Can be used in a single consumer (goroutine) only.
func (*UnboundedMailbox) Enqueue ¶ added in v2.7.0
func (m *UnboundedMailbox) Enqueue(value *ReceiveContext) error
Enqueue places the given value in the mailbox
func (*UnboundedMailbox) IsEmpty ¶ added in v2.7.0
func (m *UnboundedMailbox) IsEmpty() bool
IsEmpty returns true when the queue is empty
func (*UnboundedMailbox) Len ¶ added in v2.7.0
func (m *UnboundedMailbox) Len() int64
Len returns queue length
Source Files ¶
- actor.go
- actor_system.go
- api.go
- behavior_stack.go
- bounded_mailbox.go
- cluster_config.go
- errors.go
- func_actor.go
- mailbox.go
- option.go
- pid.go
- pid_map.go
- pid_option.go
- receive_context.go
- redistribution.go
- reflection.go
- router.go
- scheduler.go
- spawn_option.go
- stash.go
- supervisor_directive.go
- system_supervisor.go
- types.go
- unbounded_mailbox.go