actors

package
v2.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2024 License: MIT Imports: 48 Imported by: 21

Documentation

Index

Constants

View Source
const (

	// RestartDirective defines the restart strategy when handling actors failure
	RestartDirective StrategyDirective = iota
	// StopDirective defines the stop strategy when handling actors failure
	StopDirective

	// DefaultPassivationTimeout defines the default passivation timeout
	DefaultPassivationTimeout = 2 * time.Minute
	// DefaultReplyTimeout defines the default send/reply timeout
	DefaultReplyTimeout = 20 * time.Second
	// DefaultInitMaxRetries defines the default value for retrying actor initialization
	DefaultInitMaxRetries = 5
	// DefaultSupervisoryStrategy defines the default supervisory strategy
	DefaultSupervisoryStrategy = StopDirective
	// DefaultShutdownTimeout defines the default shutdown timeout
	DefaultShutdownTimeout = 2 * time.Second
	// DefaultInitTimeout defines the default init timeout
	DefaultInitTimeout = time.Second
	// DefaultMailboxSize defines the default mailbox size
	DefaultMailboxSize = 1_000_000
	// DefaultPeerStateLoopInterval defines the default peer state loop interval
	DefaultPeerStateLoopInterval = 10 * time.Second
	// DefaultStashCapacity defines the default stash size
	DefaultStashCapacity = 1_000
)

Variables

View Source
var (
	// ErrInvalidActorSystemName is returned when the actor system name is invalid
	ErrInvalidActorSystemName = errors.New("invalid ActorSystem name, must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')")
	// ErrDead means that the given actor is not alive
	ErrDead = errors.New("actor is not alive")
	// ErrUnhandled is used when an actor can handle a given message
	ErrUnhandled = errors.New("unhandled message")
	// ErrClusterDisabled is returned when cluster mode is not enabled while accessing cluster features
	ErrClusterDisabled = errors.New("cluster is not enabled")
	// ErrUndefinedActor is returned when an actor is defined
	ErrUndefinedActor = errors.New("actor is not defined")
	// ErrRequestTimeout is returned when sending an Ask message times out
	ErrRequestTimeout = errors.New("request timed out")
	// ErrRemotingDisabled is returned when remoting is not enabled
	ErrRemotingDisabled = errors.New("remoting is not enabled")
	// ErrAddressNotFound is returned when an actor address is not found
	ErrAddressNotFound = func(addr string) error {
		return connect.NewError(connect.CodeNotFound, fmt.Errorf("actor=%s not found", addr))
	}
	// ErrRemoteSendFailure is returned when remote message fails
	ErrRemoteSendFailure = func(err error) error { return connect.NewError(connect.CodeInternal, err) }
	// ErrNameRequired is used when the actor system name is required
	ErrNameRequired = errors.New("actor system is required")
	// ErrInvalidInstance is returned when the creation of an actor instance fails
	ErrInvalidInstance = errors.New("failed to create instance. Reason: invalid instance")
	// ErrActorNotFound is returned when an actor is not found
	ErrActorNotFound = func(actorPath string) error { return fmt.Errorf("actor=%s not found", actorPath) }
	// ErrMethodCallNotAllowed is returned when rpc call is not allowed
	ErrMethodCallNotAllowed = errors.New("method call not allowed")
	// ErrInvalidRemoteMessage is returned when an invalid remote message is sent
	ErrInvalidRemoteMessage = func(err error) error { return errors.Wrap(err, "invalid remote message") }
	// ErrStashBufferNotSet when stashing is not set while requesting for messages to be stashed
	ErrStashBufferNotSet = errors.New("actor is not setup with a stash buffer")
	// ErrInitFailure is returned when the initialization of an actor fails.
	ErrInitFailure = func(err error) error { return errors.Wrap(err, "failed to initialize") }
	// ErrActorSystemNotStarted is returned when the actor is not started while accessing the features of the actor system
	ErrActorSystemNotStarted = errors.New("actor system has not started yet")
	// ErrLocalAddress is returned when a remote address is used instead of a local address
	ErrLocalAddress = errors.New("address is a local address")
	// ErrEmptyMailbox is returned when the mailbox is empty
	ErrEmptyMailbox = errors.New("mailbox is empty")
	// ErrFullMailbox is returned when the mailbox is full
	ErrFullMailbox = errors.New("mailbox is full")
	// ErrInstanceNotAnActor is returned when we failed to create the instance of an actor
	ErrInstanceNotAnActor = errors.New("failed to create instance. Reason: instance does not implement the Actor interface")
	// ErrTypeNotRegistered is returned when a given actor is not registered
	ErrTypeNotRegistered = errors.New("actor type is not registered")
	// ErrPeerNotFound is returned when locating a given peer
	ErrPeerNotFound = errors.New("peer is not found")
	// ErrKindsRequired is returned when the actor kinds are not provided when enabling the cluster mode
	ErrKindsRequired = errors.New("actor kinds are required")
	// ErrUndefinedTask is returned when piping a long-running task result to an actor
	ErrUndefinedTask = errors.New("task is not defined")
	// ErrInvalidHost is returned when a request is sent to an invalid host
	ErrInvalidHost = errors.New("invalid host")
)
View Source
var RemoteNoSender = new(goaktpb.Address)

RemoteNoSender means that there is no sender

Functions

func Ask

func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)

Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.

func BatchAsk

func BatchAsk(ctx context.Context, to PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func BatchTell

func BatchTell(ctx context.Context, to PID, messages ...proto.Message) error

BatchTell sends bulk asynchronous messages to an actor

func IsEOF

func IsEOF(err error) bool

IsEOF returns true if the given error is an EOF error

func RemoteAsk

func RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message) (response *anypb.Any, err error)

RemoteAsk sends a synchronous message to another actor remotely and expect a response.

func RemoteBatchAsk

func RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) (responses []*anypb.Any, err error)

RemoteBatchAsk sends bulk messages to an actor with responses expected

func RemoteBatchTell

func RemoteBatchTell(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) error

RemoteBatchTell sends bulk asynchronous messages to an actor

func RemoteLookup

func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)

RemoteLookup look for an actor address on a remote node.

func RemoteReSpawn

func RemoteReSpawn(ctx context.Context, host string, port int, name string) error

RemoteReSpawn restarts actor on a remote node.

func RemoteSpawn

func RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error

RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem

func RemoteStop

func RemoteStop(ctx context.Context, host string, port int, name string) error

RemoteStop stops an actor on a remote node.

func RemoteTell

func RemoteTell(ctx context.Context, to *goaktpb.Address, message proto.Message) error

RemoteTell sends a message to an actor remotely without expecting any reply

func Tell

func Tell(ctx context.Context, to PID, message proto.Message) error

Tell sends an asynchronous message to an actor

Types

type Actor

type Actor interface {
	// PreStart pre-starts the actor. This function can be used to set up some database connections
	// or some sort of initialization before the actor start processing messages
	// when the initialization failed the actor will not be started.
	// Use this function to set any fields that will be needed before the actor starts.
	PreStart(ctx context.Context) error
	// Receive processes any message dropped into the actor mailbox.
	// The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously
	// by config the reply of the message. The latter approach is often used when an external service is communicating to the actor.
	// One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system.
	Receive(ctx ReceiveContext)
	// PostStop is executed when the actor is shutting down.
	// The execution happens when every message that have not been processed yet will be processed before the actor shutdowns
	// This help free-up resources
	PostStop(ctx context.Context) error
}

Actor represents the Actor interface This will be implemented by any user who wants to create an actor Any implementation must immutable which means all fields must be private(unexported). Only make use the PreStart hook to set the initial values.

type ActorSystem

type ActorSystem interface {
	// Name returns the actor system name
	Name() string
	// Actors returns the list of Actors that are alive in the actor system
	Actors() []PID
	// Start starts the actor system
	Start(ctx context.Context) error
	// Stop stops the actor system
	Stop(ctx context.Context) error
	// Spawn creates an actor in the system and starts it
	Spawn(ctx context.Context, name string, actor Actor) (PID, error)
	// SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks
	// in the given optional options
	SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error)
	// Kill stops a given actor in the system
	Kill(ctx context.Context, name string) error
	// ReSpawn recreates a given actor in the system
	ReSpawn(ctx context.Context, name string) (PID, error)
	// NumActors returns the total number of active actors in the system
	NumActors() uint64
	// LocalActor returns the reference of a local actor.
	// A local actor is an actor that reside on the same node where the given actor system is running
	LocalActor(actorName string) (PID, error)
	// RemoteActor returns the address of a remote actor when cluster is enabled
	// When the cluster mode is not enabled an actor not found error will be returned
	// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
	RemoteActor(ctx context.Context, actorName string) (addr *goaktpb.Address, err error)
	// ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled
	// When cluster mode is activated, the PID will be nil.
	// When remoting is enabled this method will return and error
	// An actor not found error is return when the actor is not found.
	ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error)
	// InCluster states whether the actor system is running within a cluster of nodes
	InCluster() bool
	// GetPartition returns the partition where a given actor is located
	GetPartition(actorName string) uint64
	// Subscribe creates an event subscriber.
	Subscribe() (eventstream.Subscriber, error)
	// Unsubscribe unsubscribes a subscriber.
	Unsubscribe(subscriber eventstream.Subscriber) error
	// ScheduleOnce schedules a message that will be delivered to the receiver actor
	// This will send the given message to the actor after the given interval specified.
	// The message will be sent once
	ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error
	// RemoteScheduleOnce schedules a message to be sent to a remote actor in the future.
	// This requires remoting to be enabled on the actor system.
	// This will send the given message to the actor after the given interval specified
	// The message will be sent once
	RemoteScheduleOnce(ctx context.Context, message proto.Message, address *goaktpb.Address, interval time.Duration) error
	// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
	ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error
	// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
	RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *goaktpb.Address, cronExpression string) error
	// PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
	// This address is empty when cluster mode is not activated
	PeerAddress() string
	// Register registers an actor for future use. This is necessary when creating an actor remotely
	Register(ctx context.Context, actor Actor) error
	// Deregister removes a registered actor from the registry
	Deregister(ctx context.Context, actor Actor) error
	// contains filtered or unexported methods
}

ActorSystem defines the contract of an actor system

func NewActorSystem

func NewActorSystem(name string, opts ...Option) (ActorSystem, error)

NewActorSystem creates an instance of ActorSystem

type Address

type Address struct {
	// contains filtered or unexported fields
}

Address represents the physical location under which an Actor can be reached. Examples are local addresses, identified by the ActorSystem’s name, and remote addresses, identified by protocol, host and port.

func NewAddress

func NewAddress(system string, host string, port int) *Address

NewAddress creates an instance of Address

func (*Address) Host

func (a *Address) Host() string

Host returns the host

func (*Address) HostPort

func (a *Address) HostPort() string

HostPort returns the host and port in the following string format @host:port

func (*Address) IsLocal

func (a *Address) IsLocal() bool

IsLocal helps set the actor address locally

func (*Address) IsRemote

func (a *Address) IsRemote() bool

IsRemote states whether the actor address is in the remote environment This happens when remoting is enabled

func (*Address) Port

func (a *Address) Port() int

Port returns the port number

func (*Address) Protocol

func (a *Address) Protocol() string

Protocol returns the protocol

func (*Address) String

func (a *Address) String() string

String returns the canonical String representation of this Address formatted as: `protocol://system@host:port`

func (*Address) System

func (a *Address) System() string

System returns the actor system name

func (*Address) WithHost

func (a *Address) WithHost(host string) (*Address, error)

WithHost sets the hosts of a given Address and returns a new instance of the address

func (*Address) WithPort

func (a *Address) WithPort(port int) (*Address, error)

WithPort sets the port of a given Address and returns a new instance of the address

func (*Address) WithSystem

func (a *Address) WithSystem(system string) *Address

WithSystem sets the actor system of a given Address and returns a new instance of the address

type Behavior

type Behavior func(ctx ReceiveContext)

Behavior defines an actor behavior

type ClusterConfig added in v2.1.0

type ClusterConfig struct {
	// contains filtered or unexported fields
}

ClusterConfig defines the cluster mode settings

func NewClusterConfig added in v2.1.0

func NewClusterConfig() *ClusterConfig

NewClusterConfig creates an instance of ClusterConfig

func (*ClusterConfig) Discovery added in v2.1.0

func (x *ClusterConfig) Discovery() discovery.Provider

Discovery returns the discovery provider

func (*ClusterConfig) GossipPort added in v2.1.0

func (x *ClusterConfig) GossipPort() int

GossipPort returns the gossip port

func (*ClusterConfig) Kinds added in v2.1.0

func (x *ClusterConfig) Kinds() []Actor

Kinds returns the actor kinds

func (*ClusterConfig) MinimumPeersQuorum added in v2.1.0

func (x *ClusterConfig) MinimumPeersQuorum() uint32

MinimumPeersQuorum returns the minimum peers quorum

func (*ClusterConfig) PartitionCount added in v2.1.0

func (x *ClusterConfig) PartitionCount() uint64

PartitionCount returns the partition count

func (*ClusterConfig) PeersPort added in v2.1.0

func (x *ClusterConfig) PeersPort() int

PeersPort returns the peers port

func (*ClusterConfig) ReplicaCount added in v2.1.0

func (x *ClusterConfig) ReplicaCount() uint32

ReplicaCount returns the replica count.

func (*ClusterConfig) Validate added in v2.1.0

func (x *ClusterConfig) Validate() error

Validate validates the cluster config

func (*ClusterConfig) WithDiscovery added in v2.1.0

func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig

WithDiscovery sets the cluster discovery provider

func (*ClusterConfig) WithGossipPort added in v2.1.0

func (x *ClusterConfig) WithGossipPort(port int) *ClusterConfig

WithGossipPort sets the gossip port

func (*ClusterConfig) WithKinds added in v2.1.0

func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig

WithKinds sets the cluster actor kinds

func (*ClusterConfig) WithMinimumPeersQuorum added in v2.1.0

func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig

WithMinimumPeersQuorum sets the cluster config minimum peers quorum

func (*ClusterConfig) WithPartitionCount added in v2.1.0

func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig

WithPartitionCount sets the cluster config partition count

func (*ClusterConfig) WithPeersPort added in v2.1.0

func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig

WithPeersPort sets the peers port

func (*ClusterConfig) WithReplicaCount added in v2.1.0

func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig

WithReplicaCount sets the cluster replica count.

type FuncOption

type FuncOption interface {
	// Apply sets the Option value of a config.
	Apply(actor *funcActor)
}

FuncOption is the interface that applies a SpawnHook option.

func WithPostStop

func WithPostStop(fn PostStopFunc) FuncOption

WithPostStop defines the PostStopFunc hook

func WithPreStart

func WithPreStart(fn PreStartFunc) FuncOption

WithPreStart defines the PreStartFunc hook

type Mailbox

type Mailbox interface {
	// Push pushes a message into the mailbox. This returns an error
	// when the box is full
	Push(msg ReceiveContext) error
	// Pop fetches a message from the mailbox
	Pop() (msg ReceiveContext, err error)
	// Iterator returns a channel that can be used to iterate over the mailbox
	Iterator() <-chan ReceiveContext
	// IsEmpty returns true when the mailbox is empty
	IsEmpty() bool
	// IsFull returns true when the mailbox is full
	IsFull() bool
	// Size returns the size of the buffer atomically
	Size() uint64
	// Reset resets the mailbox
	Reset()
	// Clone clones the current mailbox and returns a new Mailbox with reset settings
	Clone() Mailbox
	// Capacity returns the mailbox capacity atomically
	Capacity() uint64
}

Mailbox defines the actor mailbox. Any implementation should be a thread-safe FIFO

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(sys *actorSystem)
}

Option is the interface that applies a configuration option.

func WithActorInitMaxRetries

func WithActorInitMaxRetries(max int) Option

WithActorInitMaxRetries sets the number of times to retry an actor init process

func WithActorInitTimeout

func WithActorInitTimeout(timeout time.Duration) Option

WithActorInitTimeout sets how long in seconds an actor start timeout

func WithCluster added in v2.1.0

func WithCluster(config *ClusterConfig) Option

WithCluster enables the cluster mode

func WithClustering

func WithClustering(provider discovery.Provider, partitionCount uint64, minimumPeersQuorum uint16, gossipPort, peersPort int, kinds ...Actor) Option

WithClustering enables the cluster mode. Deprecated: use rather WithCluster which offers a fluent api to set cluster configuration

func WithExpireActorAfter

func WithExpireActorAfter(duration time.Duration) Option

WithExpireActorAfter sets the actor expiry duration. After such duration an idle actor will be expired and removed from the actor system

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the actor system custom log

func WithMailbox

func WithMailbox(mailbox Mailbox) Option

WithMailbox sets the custom mailbox used by the actors in the actor system

func WithMailboxSize

func WithMailboxSize(size uint64) Option

WithMailboxSize sets the actors mailbox size

func WithMetric

func WithMetric() Option

WithMetric enables metrics

func WithPartitionHasher

func WithPartitionHasher(hasher hash.Hasher) Option

WithPartitionHasher sets the partition hasher.

func WithPassivationDisabled

func WithPassivationDisabled() Option

WithPassivationDisabled disable the passivation mode

func WithPeerStateLoopInterval added in v2.2.1

func WithPeerStateLoopInterval(interval time.Duration) Option

WithPeerStateLoopInterval sets the peer state loop interval

func WithRemoting

func WithRemoting(host string, port int32) Option

WithRemoting enables remoting on the actor system

func WithReplyTimeout

func WithReplyTimeout(timeout time.Duration) Option

WithReplyTimeout sets how long in seconds an actor should reply a command in a receive-reply pattern

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) Option

WithShutdownTimeout sets the shutdown timeout

func WithStash

func WithStash(capacity uint64) Option

WithStash sets the stash buffer size

func WithSupervisorStrategy

func WithSupervisorStrategy(strategy StrategyDirective) Option

WithSupervisorStrategy sets the supervisor strategy

func WithTelemetry

func WithTelemetry(telemetry *telemetry.Telemetry) Option

WithTelemetry sets the custom telemetry

func WithTracing

func WithTracing() Option

WithTracing enables tracing

type OptionFunc

type OptionFunc func(*actorSystem)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(c *actorSystem)

type PID

type PID interface {
	// ID is a convenient method that returns the actor unique identifier
	// An actor unique identifier is its address in the actor system.
	ID() string
	// Name returns the actor given name
	Name() string
	// Shutdown gracefully shuts down the given actor
	// All current messages in the mailbox will be processed before the actor shutdown after a period of time
	// that can be configured. All child actors will be gracefully shutdown.
	Shutdown(ctx context.Context) error
	// IsRunning returns true when the actor is running ready to process public and false
	// when the actor is stopped or not started at all
	IsRunning() bool
	// SpawnChild creates a child actor
	// When the given child actor already exists its PID will only be returned
	SpawnChild(ctx context.Context, name string, actor Actor) (PID, error)
	// Restart restarts the actor
	Restart(ctx context.Context) error
	// Watch an actor
	Watch(pid PID)
	// UnWatch stops watching a given actor
	UnWatch(pid PID)
	// ActorSystem returns the underlying actor system
	ActorSystem() ActorSystem
	// ActorPath returns the path of the actor
	ActorPath() *Path
	// ActorHandle returns the underlying actor
	ActorHandle() Actor
	// Tell sends an asynchronous message to another PID
	Tell(ctx context.Context, to PID, message proto.Message) error
	// BatchTell sends an asynchronous bunch of messages to the given PID
	// The messages will be processed one after the other in the order they are sent
	// This is a design choice to follow the simple principle of one message at a time processing by actors.
	// When BatchTell encounter a single message it will fall back to a Tell call.
	BatchTell(ctx context.Context, to PID, messages ...proto.Message) error
	// Ask sends a synchronous message to another actor and expect a response.
	Ask(ctx context.Context, to PID, message proto.Message) (response proto.Message, err error)
	// BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages.
	// The messages will be processed one after the other in the order they are sent
	// This is a design choice to follow the simple principle of one message at a time processing by actors.
	// This can hinder performance if it is not properly used.
	BatchAsk(ctx context.Context, to PID, messages ...proto.Message) (responses chan proto.Message, err error)
	// RemoteTell sends a message to an actor remotely without expecting any reply
	RemoteTell(ctx context.Context, to *goaktpb.Address, message proto.Message) error
	// RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner
	// Messages are processed one after the other in the order they are sent.
	RemoteBatchTell(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) error
	// RemoteAsk is used to send a message to an actor remotely and expect a response
	// immediately. With this type of message the receiver cannot communicate back to Sender
	// except reply the message with a response. This one-way communication.
	RemoteAsk(ctx context.Context, to *goaktpb.Address, message proto.Message) (response *anypb.Any, err error)
	// RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages.
	// Messages are processed one after the other in the order they are sent.
	// This can hinder performance if it is not properly used.
	RemoteBatchAsk(ctx context.Context, to *goaktpb.Address, messages ...proto.Message) (responses []*anypb.Any, err error)
	// RemoteLookup look for an actor address on a remote node.
	RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)
	// RemoteReSpawn restarts an actor on a remote node.
	RemoteReSpawn(ctx context.Context, host string, port int, name string) error
	// RemoteStop stops an actor on a remote node
	RemoteStop(ctx context.Context, host string, port int, name string) error
	// RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem
	RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error
	// Children returns the list of all the children of the given actor that are still alive
	// or an empty list.
	Children() []PID
	// Child returns the named child actor if it is alive
	Child(name string) (PID, error)
	// Stop forces the child Actor under the given name to terminate after it finishes processing its current message.
	// Nothing happens if child is already stopped.
	Stop(ctx context.Context, pid PID) error
	// StashSize returns the stash buffer size
	StashSize() uint64
	// Equals is a convenient method to compare two PIDs
	Equals(to PID) bool
	// PipeTo processes a long-running task and pipes the result to the provided actor.
	// The successful result of the task will be put onto the provided actor mailbox.
	// This is useful when interacting with external services.
	// It’s common that you would like to use the value of the response in the actor when the long-running task is completed
	PipeTo(ctx context.Context, to PID, task future.Task) error
	// contains filtered or unexported methods
}

PID defines the various actions one can perform on a given actor

var NoSender PID

NoSender means that there is no sender

type Path

type Path struct {
	// contains filtered or unexported fields
}

Path is a unique path to an actor

func NewPath

func NewPath(name string, address *Address) *Path

NewPath creates an immutable Path

func (*Path) Address

func (p *Path) Address() *Address

Address returns the address of the path

func (*Path) Equals

func (p *Path) Equals(x *Path) bool

Equals is used to compare two paths

func (*Path) ID

func (p *Path) ID() uuid.UUID

ID returns the internal unique id of the actor that this path refer to.

func (*Path) Name

func (p *Path) Name() string

Name returns the name of the actor that this path refers to.

func (*Path) Parent

func (p *Path) Parent() *Path

Parent returns the parent path

func (*Path) RemoteAddress

func (p *Path) RemoteAddress() *goaktpb.Address

RemoteAddress returns the remote from path

func (*Path) String

func (p *Path) String() string

String returns the string representation of an actorPath

func (*Path) WithParent

func (p *Path) WithParent(parent *Path) *Path

WithParent sets the parent actor path and returns a new path This function is immutable

type PostStopFunc

type PostStopFunc = func(ctx context.Context) error

PostStopFunc defines the PostStopFunc hook for an actor creation

type PreStartFunc

type PreStartFunc = func(ctx context.Context) error

PreStartFunc defines the PreStartFunc hook for an actor creation

type ReceiveContext

type ReceiveContext interface {
	// Context returns the context attached to the message
	Context() context.Context
	// Sender of the message. In the case of remote message this will be set to NoSender
	Sender() PID
	// Self represents the actor receiving the message.
	Self() PID
	// Message is the actual message sent
	Message() proto.Message
	// Response sets the message response
	// Use this method within the Actor.Receive method of the actor to sets a reply
	// This can only be used when we are request-response pattern. When it is an async communication
	// this operation will amount to nothing.
	Response(resp proto.Message)
	// RemoteSender defines the remote sender of the message if it is a remote message
	// This is set to RemoteNoSender when the message is not a remote message
	RemoteSender() *goaktpb.Address
	// Become switch the current behavior of the actor to a new behavior
	// The current message in process during the transition will still be processed with the current
	// behavior before the transition. However, subsequent messages will be processed with the new behavior.
	// One needs to call UnBecome to reset the actor behavior to the default one which is the Actor.Receive method
	// which is the default behavior.
	Become(behavior Behavior)
	// UnBecome reset the actor behavior to the default one which is the
	// Actor.Receive method
	UnBecome()
	// BecomeStacked sets a new behavior to the actor to the top of the behavior stack, while maintaining the previous ones.
	// The current message in process during the transition will still be processed with the current
	// behavior before the transition. However, subsequent messages will be processed with the new behavior.
	// One needs to call UnBecomeStacked to go the previous the actor's behavior.
	// which is the default behavior.
	BecomeStacked(behavior Behavior)
	// UnBecomeStacked sets the actor behavior to the previous behavior before BecomeStacked was called
	UnBecomeStacked()
	// Stash adds the current message to the stash buffer
	Stash()
	// Unstash unstashes the oldest message in the stash and prepends to the mailbox
	Unstash()
	// UnstashAll unstashes all messages from the stash buffer  and prepends in the mailbox
	// it keeps the messages in the same order as received, unstashing older messages before newer
	UnstashAll()
	// Tell sends an asynchronous message to another PID
	Tell(to PID, message proto.Message)
	// BatchTell sends an asynchronous bunch of messages to the given PID
	// The messages will be processed one after the other in the order they are sent
	// This is a design choice to follow the simple principle of one message at a time processing by actors.
	// When TellStream encounter a single message it will fall back to a Tell call.
	BatchTell(to PID, messages ...proto.Message)
	// Ask sends a synchronous message to another actor and expect a response. This method is good when interacting with a child actor.
	// Ask has a timeout which can cause the sender to panic. When ask times out, the receiving actor does not know and may still process the message.
	// It is recommended to set a good timeout to quickly receive response and try to avoid false positives
	Ask(to PID, message proto.Message) (response proto.Message)
	// BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages.
	// The messages will be processed one after the other in the order they are sent
	// This is a design choice to follow the simple principle of one message at a time processing by actors.
	BatchAsk(to PID, messages ...proto.Message) (responses chan proto.Message)
	// Forward method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender.
	// As a result, the actor receiving the forwarded messages knows who the actual sender of the message is.
	// The message that is forwarded is the current message received by the received context.
	// This operation does nothing when the receiving actor is not running
	Forward(to PID)
	// RemoteTell sends a message to an actor remotely without expecting any reply
	RemoteTell(to *goaktpb.Address, message proto.Message)
	// RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner
	// Messages are processed one after the other in the order they are sent.
	RemoteBatchTell(to *goaktpb.Address, messages ...proto.Message)
	// RemoteAsk is used to send a message to an actor remotely and expect a response
	// immediately. This executed within an actor can hinder performance because this is a blocking call.
	RemoteAsk(to *goaktpb.Address, message proto.Message) (response *anypb.Any)
	// RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages.
	// Messages are processed one after the other in the order they are sent.
	// This can hinder performance if it is not properly used.
	RemoteBatchAsk(to *goaktpb.Address, messages ...proto.Message) (responses []*anypb.Any)
	// RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done
	// using the same actor system as the PID actor system
	RemoteLookup(host string, port int, name string) (addr *goaktpb.Address)
	// Shutdown gracefully shuts down the given actor
	// All current messages in the mailbox will be processed before the actor shutdown after a period of time
	// that can be configured. All child actors will be gracefully shutdown.
	Shutdown()
	// Spawn creates a child actor or panic
	Spawn(name string, actor Actor) PID
	// Children returns the list of all the children of the given actor
	Children() []PID
	// Child returns the named child actor if it is alive
	Child(name string) PID
	// Stop forces the child Actor under the given name to terminate after it finishes processing its current message.
	// Nothing happens if child is already stopped.
	Stop(child PID)
	// Unhandled is used to handle unhandled messages instead of throwing error
	// This will push the given message into the deadletter queue
	Unhandled()
	// RemoteReSpawn restarts an actor on a remote node.
	RemoteReSpawn(host string, port int, name string)
	// Err is used instead of panicking within a message handler.
	// One can also call panic which is not the recommended way
	Err(err error)
	// PipeTo processes a long-running task and pipes the result to the provided actor.
	// The successful result of the task will be put onto the provided actor mailbox.
	// This is useful when interacting with external services.
	// It’s common that you would like to use the value of the response in the actor when the long-running task is completed
	PipeTo(to PID, task future.Task)
}

ReceiveContext is the context that is used by the actor to receive messages

type ReceiveFunc

type ReceiveFunc = func(ctx context.Context, message proto.Message) error

ReceiveFunc is a message handling placeholder

type StrategyDirective

type StrategyDirective int

StrategyDirective represents the supervisor strategy directive

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL