Documentation ¶
Index ¶
- Constants
- Variables
- func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
- func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message) (response *anypb.Any, err error)
- func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error)
- func RemoteTell(ctx context.Context, to *addresspb.Address, message proto.Message) error
- func Tell(ctx context.Context, to PID, message proto.Message) error
- type Actor
- type ActorSystem
- type Address
- func (a *Address) Host() string
- func (a *Address) HostPort() string
- func (a *Address) IsLocal() bool
- func (a *Address) IsRemote() bool
- func (a *Address) Port() int
- func (a *Address) Protocol() string
- func (a *Address) String() string
- func (a *Address) System() string
- func (a *Address) WithHost(host string) (*Address, error)
- func (a *Address) WithPort(port int) (*Address, error)
- func (a *Address) WithSystem(system string) *Address
- type Behavior
- type Mailbox
- type Option
- func WithActorInitMaxRetries(max int) Option
- func WithClustering(serviceDiscovery *discovery.ServiceDiscovery, partitionCount uint64) Option
- func WithExpireActorAfter(duration time.Duration) Option
- func WithLogger(logger log.Logger) Option
- func WithMailbox(mailbox Mailbox) Option
- func WithMailboxSize(size uint64) Option
- func WithPassivationDisabled() Option
- func WithRemoting(host string, port int32) Option
- func WithReplyTimeout(timeout time.Duration) Option
- func WithShutdownTimeout(timeout time.Duration) Option
- func WithStash(capacity uint64) Option
- func WithSupervisorStrategy(strategy StrategyDirective) Option
- func WithTelemetry(telemetry *telemetry.Telemetry) Option
- type OptionFunc
- type PID
- type Path
- type ReceiveContext
- type Reflection
- type StrategyDirective
- type TypesLoader
- type Unit
- type WatchMan
Constants ¶
const ( // RestartDirective defines the restart strategy when handling actors failure RestartDirective StrategyDirective = iota // StopDirective defines the stop strategy when handling actors failure StopDirective // DefaultPassivationTimeout defines the default passivation timeout DefaultPassivationTimeout = 2 * time.Minute // DefaultReplyTimeout defines the default send/reply timeout DefaultReplyTimeout = 100 * time.Millisecond // DefaultInitMaxRetries defines the default value for retrying actor initialization DefaultInitMaxRetries = 5 // DefaultSupervisoryStrategy defines the default supervisory strategy DefaultSupervisoryStrategy = StopDirective // DefaultShutdownTimeout defines the default shutdown timeout DefaultShutdownTimeout = 2 * time.Second )
Variables ¶
var ( ErrInvalidActorSystemName = errors.New("invalid ActorSystem name, must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')") ErrNotReady = errors.New("actor is not ready") ErrUnhandled = errors.New("unhandled message") ErrClusterNotEnabled = errors.New("cluster is not enabled") ErrUndefinedActor = errors.New("actor is not defined") ErrRequestTimeout = errors.New("request timed out") ErrRemotingNotEnabled = errors.New("remoting is not enabled") ErrInvalidActorSystemNode = connect.NewError(connect.CodeFailedPrecondition, errors.New("invalid actor system node")) ErrAddressNotFound = func(addr string) error { return connect.NewError(connect.CodeNotFound, fmt.Errorf("actor=%s not found", addr)) } ErrRemoteSendFailure = func(err error) error { return connect.NewError(connect.CodeInternal, err) } ErrInstanceNotAnActor = errors.New("failed to create instance. Reason: instance does not implement the Actor interface") ErrInvalidInstance = errors.New("failed to create instance. Reason: invalid instance") ErrTypeNotFound = func(typeName string) error { return fmt.Errorf("typeName=%s not found", typeName) } ErrActorNotFound = func(actorPath string) error { return fmt.Errorf("actor=%s not found", actorPath) } ErrMethodCallNotAllowed = errors.New("method call not allowed") ErrInvalidRemoteMessage = func(err error) error { return errors.Wrap(err, "invalid remote message") } ErrStashBufferNotSet = errors.New("actor is not setup with a stash buffer") )
var ( // ErrEmptyMailbox is returned when the mailbox is empty ErrEmptyMailbox = errors.New("mailbox is empty") // ErrFullMailbox is returned when the mailbox is full ErrFullMailbox = errors.New("mailbox is full") )
var (
ErrLocalAddress = errors.New("address is a local address")
)
var (
ErrNameRequired = errors.New("actor system is required")
)
var RemoteNoSender = new(addresspb.Address)
RemoteNoSender means that there is no sender
Functions ¶
func Ask ¶ added in v0.2.0
func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.
func RemoteAsk ¶ added in v0.2.0
func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message) (response *anypb.Any, err error)
RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func RemoteLookup ¶
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error)
RemoteLookup look for an actor address on a remote node.
func RemoteTell ¶ added in v0.2.0
RemoteTell sends a message to an actor remotely without expecting any reply
Types ¶
type Actor ¶
type Actor interface { // PreStart pre-starts the actor. This function can be used to set up some database connections // or some sort of initialization before the actor start processing public PreStart(ctx context.Context) error // Receive processes any message dropped into the actor mailbox. // The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously // by config the reply of the message. The latter approach is often used when an external service is communicating to the actor. // One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system. Receive(ctx ReceiveContext) // PostStop is executed when the actor is shutting down. // The execution happens when every message that have not been processed yet will be processed before the actor shutdowns PostStop(ctx context.Context) error }
Actor represents the Actor interface This will be implemented by any user who wants to create an actor
type ActorSystem ¶
type ActorSystem interface { // Name returns the actor system name Name() string // Actors returns the list of Actors that are alive in the actor system Actors() []PID // Start starts the actor system Start(ctx context.Context) error // Stop stops the actor system Stop(ctx context.Context) error // Spawn creates an actor in the system and starts it Spawn(ctx context.Context, name string, actor Actor) PID // Kill stops a given actor in the system Kill(ctx context.Context, name string) error // ReSpawn recreates a given actor in the system ReSpawn(ctx context.Context, name string) (PID, error) // NumActors returns the total number of active actors in the system NumActors() uint64 // LocalActor returns the reference of a local actor. // A local actor is an actor that reside on the same node where the given actor system is running LocalActor(ctx context.Context, actorName string) (PID, error) // RemoteActor returns the address of a remote actor when cluster is enabled // When the cluster mode is not enabled an actor not found error will be returned // One can always check whether cluster is enabled before calling this method or just use the ActorOf method. RemoteActor(ctx context.Context, actorName string) (addr *addresspb.Address, err error) // ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled // When cluster mode is activated, the PID will be nil. // When remoting is enabled this method will return and error // An actor not found error is return when the actor is not found. ActorOf(ctx context.Context, actorName string) (addr *addresspb.Address, pid PID, err error) // InCluster states whether the actor system is running within a cluster of nodes InCluster() bool // GetPartition returns the partition where a given actor is located GetPartition(ctx context.Context, actorName string) uint64 // contains filtered or unexported methods }
ActorSystem defines the contract of an actor system
func NewActorSystem ¶
func NewActorSystem(name string, opts ...Option) (ActorSystem, error)
NewActorSystem creates an instance of ActorSystem
type Address ¶ added in v0.2.0
type Address struct {
// contains filtered or unexported fields
}
Address represents the physical location under which an Actor can be reached. Examples are local addresses, identified by the ActorSystem’s name, and remote addresses, identified by protocol, host and port.
func NewAddress ¶ added in v0.2.0
NewAddress creates an instance of Address
func (*Address) HostPort ¶ added in v0.2.0
HostPort returns the host and port in the following string format @host:port
func (*Address) IsRemote ¶ added in v0.2.0
IsRemote states whether the actor address is in the remote environment This happens when remoting is enabled
func (*Address) String ¶ added in v0.2.0
String returns the canonical String representation of this Address formatted as: `protocol://system@host:port`
func (*Address) WithHost ¶ added in v0.2.0
WithHost sets the hosts of a given Address and returns a new instance of the address
func (*Address) WithPort ¶ added in v0.2.0
WithPort sets the port of a given Address and returns a new instance of the address
func (*Address) WithSystem ¶ added in v0.2.0
WithSystem sets the actor system of a given Address and returns a new instance of the address
type Mailbox ¶ added in v0.6.0
type Mailbox interface { // Push pushes a message into the mailbox. This returns an error // when the box is full Push(msg ReceiveContext) error // Pop fetches a message from the mailbox Pop() (msg ReceiveContext, err error) // IsEmpty returns true when the mailbox is empty IsEmpty() bool // IsFull returns true when the mailbox is full IsFull() bool // Size returns the size of the buffer atomically Size() uint64 // Reset resets the mailbox Reset() // Clone clones the current mailbox and returns a new Mailbox with reset settings Clone() Mailbox // Capacity returns the mailbox capacity atomically Capacity() uint64 }
Mailbox defines the actor mailbox. Any implementation should be a thread-safe FIFO
type Option ¶
type Option interface {
// Apply sets the Option value of a config.
Apply(sys *actorSystem)
}
Option is the interface that applies a configuration option.
func WithActorInitMaxRetries ¶
WithActorInitMaxRetries sets the number of times to retry an actor init process
func WithClustering ¶ added in v0.2.0
func WithClustering(serviceDiscovery *discovery.ServiceDiscovery, partitionCount uint64) Option
WithClustering enables clustering on the actor system. This enables remoting on the actor system as well and set the remotingHost to the cluster node host when the cluster is fully enabled.
func WithExpireActorAfter ¶
WithExpireActorAfter sets the actor expiry duration. After such duration an idle actor will be expired and removed from the actor system
func WithLogger ¶
WithLogger sets the actor system custom log
func WithMailbox ¶ added in v0.6.0
WithMailbox sets the custom mailbox used by the actors in the actor system
func WithMailboxSize ¶ added in v0.6.0
WithMailboxSize sets the actors mailbox size
func WithPassivationDisabled ¶
func WithPassivationDisabled() Option
WithPassivationDisabled disable the passivation mode
func WithRemoting ¶
WithRemoting enables remoting on the actor system
func WithReplyTimeout ¶
WithReplyTimeout sets how long in seconds an actor should reply a command in a receive-reply pattern
func WithShutdownTimeout ¶ added in v0.4.0
WithShutdownTimeout sets the shutdown timeout
func WithSupervisorStrategy ¶
func WithSupervisorStrategy(strategy StrategyDirective) Option
WithSupervisorStrategy sets the supervisor strategy
func WithTelemetry ¶
WithTelemetry sets the custom telemetry
type OptionFunc ¶
type OptionFunc func(*actorSystem)
OptionFunc implements the Option interface.
func (OptionFunc) Apply ¶
func (f OptionFunc) Apply(c *actorSystem)
type PID ¶
type PID interface { // Shutdown gracefully shuts down the given actor // All current messages in the mailbox will be processed before the actor shutdown after a period of time // that can be configured. All child actors will be gracefully shutdown. Shutdown(ctx context.Context) error // IsRunning returns true when the actor is running ready to process public and false // when the actor is stopped or not started at all IsRunning() bool // ReceivedCount returns the total number of public processed by the actor // at a given point in time while the actor heart is still beating ReceivedCount(ctx context.Context) uint64 // ErrorsCount returns the total number of panic attacks that occur while the actor is processing public // at a given point in time while the actor heart is still beating ErrorsCount(ctx context.Context) uint64 // SpawnChild creates a child actor SpawnChild(ctx context.Context, name string, actor Actor) (PID, error) // Restart restarts the actor Restart(ctx context.Context) error // Watch an actor Watch(pid PID) // UnWatch stops watching a given actor UnWatch(pid PID) // ActorSystem returns the underlying actor system ActorSystem() ActorSystem // ActorPath returns the path of the actor ActorPath() *Path // ActorHandle returns the underlying actor ActorHandle() Actor // Tell sends an asynchronous message to another PID Tell(ctx context.Context, to PID, message proto.Message) error // Ask sends a synchronous message to another actor and expect a response. Ask(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error) // RemoteTell sends a message to an actor remotely without expecting any reply RemoteTell(ctx context.Context, to *addresspb.Address, message proto.Message) error // RemoteAsk is used to send a message to an actor remotely and expect a response // immediately. With this type of message the receiver cannot communicate back to Sender // except reply the message with a response. This one-way communication. RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message) (response *anypb.Any, err error) // RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done // using the same actor system as the PID actor system RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) // RestartCount returns the number of times the actor has restarted RestartCount(ctx context.Context) uint64 // MailboxSize returns the mailbox size a given time MailboxSize(ctx context.Context) uint64 // Children returns the list of all the children of the given actor Children(ctx context.Context) []PID // StashSize returns the stash buffer size StashSize(ctx context.Context) uint64 // contains filtered or unexported methods }
PID defines the various actions one can perform on a given actor
var NoSender PID
NoSender means that there is no sender
type Path ¶
type Path struct {
// contains filtered or unexported fields
}
Path is a unique path to an actor
func (*Path) RemoteAddress ¶
RemoteAddress returns the remote from path
func (*Path) WithParent ¶
WithParent sets the parent actor path and returns a new path This function is immutable
type ReceiveContext ¶
type ReceiveContext interface { // Context returns the context attached to the message Context() context.Context // Sender of the message. In the case of remote message this will be set to NoSender Sender() PID // Self represents the actor receiving the message. Self() PID // Message is the actual message sent Message() proto.Message // Response sets the message response // Use this method within the Actor.Receive method of the actor to sets a reply // This can only be used when we are request-response pattern. When it is an async communication // this operation will amount to nothing. Response(resp proto.Message) // RemoteSender defines the remote sender of the message if it is a remote message // This is set to RemoteNoSender when the message is not a remote message RemoteSender() *addresspb.Address // Become switch the current behavior of the actor to a new behavior // The current message in process during the transition will still be processed with the current // behavior before the transition. However, subsequent messages will be processed with the new behavior. // One needs to call UnBecome to reset the actor behavior to the default one which is the Actor.Receive method // which is the default behavior. Become(behavior Behavior) // UnBecome reset the actor behavior to the default one which is the // Actor.Receive method UnBecome() // BecomeStacked sets a new behavior to the actor to the top of the behavior stack, while maintaining the previous ones. // The current message in process during the transition will still be processed with the current // behavior before the transition. However, subsequent messages will be processed with the new behavior. // One needs to call UnBecomeStacked to go the previous the actor's behavior. // which is the default behavior. BecomeStacked(behavior Behavior) // UnBecomeStacked sets the actor behavior to the previous behavior before BecomeStacked was called UnBecomeStacked() // Stash adds the current message to the stash buffer Stash() // Unstash unstashes the oldest message in the stash and prepends to the mailbox Unstash() // UnstashAll unstashes all messages from the stash buffer and prepends in the mailbox // it keeps the messages in the same order as received, unstashing older messages before newer UnstashAll() }
ReceiveContext is the context that is used by the actor to receive messages
type Reflection ¶
type Reflection interface { // ActorOf creates a new instance of Actor from its concrete type ActorOf(rtype reflect.Type) (actor Actor, err error) // ActorFrom creates a new instance of Actor from its FQN ActorFrom(name string) (actor Actor, err error) }
Reflection helps create an instance dynamically
func NewReflection ¶
func NewReflection(loader TypesLoader) Reflection
NewReflection creates an instance of Reflection
type StrategyDirective ¶
type StrategyDirective int
StrategyDirective represents the supervisor strategy directive
type TypesLoader ¶
type TypesLoader interface { // Register an object with its fully qualified name Register(name string, v any) // Type returns the type of object, Type(v any) (reflect.Type, bool) // TypeByName returns the type of object given its name TypeByName(name string) (reflect.Type, bool) }
TypesLoader represents reflection typesLoader for dynamic loading and creation of actors at run-time
func NewTypesLoader ¶
func NewTypesLoader() TypesLoader
NewTypesLoader creates an instance of TypesLoader
type WatchMan ¶ added in v0.2.0
type WatchMan struct { ID PID // the ID of the actor watching ErrChan chan error // ErrChan the channel where to pass error message Done chan Unit // Done when watching is completed }
WatchMan is used to handle parent child relationship. This helps handle error propagation from a child actor using any of supervisory strategies