actors

package
v2.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2024 License: MIT Imports: 45 Imported by: 21

Documentation

Index

Constants

View Source
const (
	// DefaultPassivationTimeout defines the default passivation timeout
	DefaultPassivationTimeout = 2 * time.Minute
	// DefaultInitMaxRetries defines the default value for retrying actor initialization
	DefaultInitMaxRetries = 5
	// DefaultShutdownTimeout defines the default shutdown timeout
	DefaultShutdownTimeout = time.Minute
	// DefaultInitTimeout defines the default init timeout
	DefaultInitTimeout = time.Second
	// DefaultPeerStateLoopInterval defines the default peer state loop interval
	DefaultPeerStateLoopInterval = 10 * time.Second

	// DefaultJanitorInterval defines the default GC interval
	// This helps cleanup dead actors from the given actor system
	DefaultJanitorInterval = 30 * time.Millisecond
)

Variables

View Source
var (
	// ErrInvalidActorSystemName is returned when the actor system name is invalid
	ErrInvalidActorSystemName = errors.New("invalid ActorSystem name, must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')")
	// ErrDead means that the given actor is not alive
	ErrDead = errors.New("actor is not alive")
	// ErrUnhandled is used when an actor can handle a given message
	ErrUnhandled = errors.New("unhandled message")
	// ErrClusterDisabled is returned when cluster mode is not enabled while accessing cluster features
	ErrClusterDisabled = errors.New("cluster is not enabled")
	// ErrUndefinedActor is returned when an actor is defined
	ErrUndefinedActor = errors.New("actor is not defined")
	// ErrRequestTimeout is returned when sending an Ask message times out
	ErrRequestTimeout = errors.New("request timed out")
	// ErrRemotingDisabled is returned when remoting is not enabled
	ErrRemotingDisabled = errors.New("remoting is not enabled")
	// ErrAddressNotFound is returned when an actor address is not found
	ErrAddressNotFound = func(addr string) error {
		return connect.NewError(connect.CodeNotFound, fmt.Errorf("actor=%s not found", addr))
	}
	// ErrRemoteSendFailure is returned when remote message fails
	ErrRemoteSendFailure = func(err error) error { return connect.NewError(connect.CodeInternal, err) }
	// ErrNameRequired is used when the actor system name is required
	ErrNameRequired = errors.New("actor system is required")
	// ErrInvalidInstance is returned when the creation of an actor instance fails
	ErrInvalidInstance = errors.New("failed to create instance. Reason: invalid instance")
	// ErrActorNotFound is returned when an actor is not found
	ErrActorNotFound = func(actorPath string) error { return fmt.Errorf("actor=%s not found", actorPath) }
	// ErrMethodCallNotAllowed is returned when rpc call is not allowed
	ErrMethodCallNotAllowed = errors.New("method call not allowed")
	// ErrInvalidRemoteMessage is returned when an invalid remote message is sent
	ErrInvalidRemoteMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) }
	// ErrStashBufferNotSet when stashing is not set while requesting for messages to be stashed
	ErrStashBufferNotSet = errors.New("actor is not setup with a stash buffer")
	// ErrInitFailure is returned when the initialization of an actor fails.
	ErrInitFailure = func(err error) error { return fmt.Errorf("failed to initialize: %w", err) }
	// ErrActorSystemNotStarted is returned when the actor is not started while accessing the features of the actor system
	ErrActorSystemNotStarted = errors.New("actor system has not started yet")
	// ErrLocalAddress is returned when a remote address is used instead of a local address
	ErrLocalAddress = errors.New("address is a local address")
	// ErrInstanceNotAnActor is returned when we failed to create the instance of an actor
	ErrInstanceNotAnActor = errors.New("failed to create instance. Reason: instance does not implement the Actor interface")
	// ErrTypeNotRegistered is returned when a given actor is not registered
	ErrTypeNotRegistered = errors.New("actor type is not registered")
	// ErrPeerNotFound is returned when locating a given peer
	ErrPeerNotFound = errors.New("peer is not found")
	// ErrUndefinedTask is returned when piping a long-running task result to an actor
	ErrUndefinedTask = errors.New("task is not defined")
	// ErrInvalidHost is returned when a request is sent to an invalid host
	ErrInvalidHost = errors.New("invalid host")
	// ErrFullMailbox is returned when the mailbox is full
	ErrFullMailbox = errors.New("mailbox is full")
	// ErrSchedulerNotStarted is returned when the scheduler has not started
	ErrSchedulerNotStarted = errors.New("scheduler has not started")
	// ErrInvalidMessage is returned when an invalid remote message is sent
	ErrInvalidMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) }
	// ErrInvalidTimeout is returned when a given timeout is negative or zero
	ErrInvalidTimeout = errors.New("invalid timeout")
)

Functions

func Ask

func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)

Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.

func BatchAsk

func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func BatchTell

func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error

BatchTell sends bulk asynchronous messages to an actor The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func Tell

func Tell(ctx context.Context, to *PID, message proto.Message) error

Tell sends an asynchronous message to an actor

Types

type Actor

type Actor interface {
	// PreStart pre-starts the actor. This function can be used to set up some database connections
	// or some sort of initialization before the actor start processing messages
	// when the initialization failed the actor will not be started.
	// Use this function to set any fields that will be needed before the actor starts.
	// This hook helps set the default values needed by any fields of the actor.
	PreStart(ctx context.Context) error
	// Receive processes any message dropped into the actor mailbox.
	// The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously
	// by config the reply of the message. The latter approach is often used when an external service is communicating to the actor.
	// One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system.
	Receive(ctx *ReceiveContext)
	// PostStop is executed when the actor is shutting down.
	// The execution happens when every message that have not been processed yet will be processed before the actor shutdowns
	// This help free-up resources
	PostStop(ctx context.Context) error
}

Actor represents the Actor interface This will be implemented by any user who wants to create an actor Any implementation must immutable which means all fields must be private(unexported). Only make use the PreStart hook to set the initial values.

type ActorSystem

type ActorSystem interface {
	// Name returns the actor system name
	Name() string
	// Actors returns the list of Actors that are alive in the actor system
	Actors() []*PID
	// Start starts the actor system
	Start(ctx context.Context) error
	// Stop stops the actor system
	Stop(ctx context.Context) error
	// Spawn creates an actor in the system and starts it
	Spawn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
	// SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks
	// in the given optional options
	SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
	// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
	// in the given optional options
	SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
	// SpawnRouter creates a new router. One can additionally set the router options.
	// A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel.
	// A single actor will only process one message at a time.
	SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error)
	// Kill stops a given actor in the system
	Kill(ctx context.Context, name string) error
	// ReSpawn recreates a given actor in the system
	ReSpawn(ctx context.Context, name string) (*PID, error)
	// NumActors returns the total number of active actors in the system
	NumActors() uint64
	// LocalActor returns the reference of a local actor.
	// A local actor is an actor that reside on the same node where the given actor system has started
	LocalActor(actorName string) (*PID, error)
	// RemoteActor returns the address of a remote actor when cluster is enabled
	// When the cluster mode is not enabled an actor not found error will be returned
	// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
	RemoteActor(ctx context.Context, actorName string) (addr *address.Address, err error)
	// ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled
	// When cluster mode is activated, the PID will be nil.
	// When remoting is enabled this method will return and error
	// An actor not found error is return when the actor is not found.
	ActorOf(ctx context.Context, actorName string) (addr *address.Address, pid *PID, err error)
	// InCluster states whether the actor system has started within a cluster of nodes
	InCluster() bool
	// GetPartition returns the partition where a given actor is located
	GetPartition(actorName string) uint64
	// Subscribe creates an event subscriber.
	Subscribe() (eventstream.Subscriber, error)
	// Unsubscribe unsubscribes a subscriber.
	Unsubscribe(subscriber eventstream.Subscriber) error
	// ScheduleOnce schedules a message that will be delivered to the receiver actor
	// This will send the given message to the actor after the given interval specified.
	// The message will be sent once
	ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error
	// Schedule schedules a message that will be delivered to the receiver actor
	// This will send the given message to the actor after the given interval specified.
	Schedule(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error
	// RemoteScheduleOnce schedules a message to be sent to a remote actor in the future.
	// This requires remoting to be enabled on the actor system.
	// This will send the given message to the actor after the given interval specified
	// The message will be sent once
	RemoteScheduleOnce(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error
	// RemoteSchedule schedules a message to be sent to a remote actor in the future.
	// This requires remoting to be enabled on the actor system.
	// This will send the given message to the actor after the given interval specified
	RemoteSchedule(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error
	// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
	ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error
	// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
	RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *address.Address, cronExpression string) error
	// PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
	// This address is empty when cluster mode is not activated
	PeerAddress() string
	// Register registers an actor for future use. This is necessary when creating an actor remotely
	Register(ctx context.Context, actor Actor) error
	// Deregister removes a registered actor from the registry
	Deregister(ctx context.Context, actor Actor) error
	// Logger returns the logger sets when creating the actor system
	Logger() log.Logger
	// contains filtered or unexported methods
}

ActorSystem defines the contract of an actor system

func NewActorSystem

func NewActorSystem(name string, opts ...Option) (ActorSystem, error)

NewActorSystem creates an instance of ActorSystem

type Behavior

type Behavior func(ctx *ReceiveContext)

Behavior defines an actor behavior

type BoundedMailbox added in v2.7.0

type BoundedMailbox struct {
	// contains filtered or unexported fields
}

BoundedMailbox defines a bounded mailbox using ring buffer queue This mailbox is thread-safe

func NewBoundedMailbox added in v2.7.0

func NewBoundedMailbox(cap int) *BoundedMailbox

NewBoundedMailbox creates a new instance BoundedMailbox

func (*BoundedMailbox) Dequeue added in v2.7.0

func (mailbox *BoundedMailbox) Dequeue() (msg *ReceiveContext)

Dequeue takes the mail from the mailbox It returns nil when the mailbox is empty

func (*BoundedMailbox) Enqueue added in v2.7.0

func (mailbox *BoundedMailbox) Enqueue(msg *ReceiveContext) error

Enqueue places the given value in the mailbox This will return an error when the mailbox is full

func (*BoundedMailbox) IsEmpty added in v2.7.0

func (mailbox *BoundedMailbox) IsEmpty() bool

IsEmpty returns true when the mailbox is empty

func (*BoundedMailbox) Len added in v2.7.0

func (mailbox *BoundedMailbox) Len() int64

Len returns queue length

type ClusterConfig added in v2.1.0

type ClusterConfig struct {
	// contains filtered or unexported fields
}

ClusterConfig defines the cluster mode settings

func NewClusterConfig added in v2.1.0

func NewClusterConfig() *ClusterConfig

NewClusterConfig creates an instance of ClusterConfig

func (*ClusterConfig) Discovery added in v2.1.0

func (x *ClusterConfig) Discovery() discovery.Provider

Discovery returns the discovery provider

func (*ClusterConfig) DiscoveryPort added in v2.6.2

func (x *ClusterConfig) DiscoveryPort() int

DiscoveryPort returns the discovery port

func (*ClusterConfig) Kinds added in v2.1.0

func (x *ClusterConfig) Kinds() []Actor

Kinds returns the actor kinds

func (*ClusterConfig) MinimumPeersQuorum added in v2.1.0

func (x *ClusterConfig) MinimumPeersQuorum() uint32

MinimumPeersQuorum returns the minimum peers quorum

func (*ClusterConfig) PartitionCount added in v2.1.0

func (x *ClusterConfig) PartitionCount() uint64

PartitionCount returns the partition count

func (*ClusterConfig) PeersPort added in v2.1.0

func (x *ClusterConfig) PeersPort() int

PeersPort returns the peers port

func (*ClusterConfig) ReplicaCount added in v2.1.0

func (x *ClusterConfig) ReplicaCount() uint32

ReplicaCount returns the replica count.

func (*ClusterConfig) Validate added in v2.1.0

func (x *ClusterConfig) Validate() error

Validate validates the cluster config

func (*ClusterConfig) WithDiscovery added in v2.1.0

func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig

WithDiscovery sets the cluster discovery provider

func (*ClusterConfig) WithDiscoveryPort added in v2.6.2

func (x *ClusterConfig) WithDiscoveryPort(port int) *ClusterConfig

WithDiscoveryPort sets the discovery port

func (*ClusterConfig) WithKinds added in v2.1.0

func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig

WithKinds sets the cluster actor kinds

func (*ClusterConfig) WithMinimumPeersQuorum added in v2.1.0

func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig

WithMinimumPeersQuorum sets the cluster config minimum peers quorum

func (*ClusterConfig) WithPartitionCount added in v2.1.0

func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig

WithPartitionCount sets the cluster config partition count

func (*ClusterConfig) WithPeersPort added in v2.1.0

func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig

WithPeersPort sets the peers port

func (*ClusterConfig) WithReplicaCount added in v2.1.0

func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig

WithReplicaCount sets the cluster replica count.

type FuncOption

type FuncOption interface {
	// Apply sets the Option value of a config.
	Apply(actor *funcConfig)
}

FuncOption is the interface that applies a SpawnHook option.

func WithFuncMailbox added in v2.7.0

func WithFuncMailbox(mailbox Mailbox) FuncOption

WithFuncMailbox sets the mailbox to use when starting the func-based actor

func WithPostStop

func WithPostStop(fn PostStopFunc) FuncOption

WithPostStop defines the PostStopFunc hook

func WithPreStart

func WithPreStart(fn PreStartFunc) FuncOption

WithPreStart defines the PreStartFunc hook

type Mailbox

type Mailbox interface {
	// Enqueue pushes a message into the mailbox. This returns an error
	// when the box is full
	Enqueue(msg *ReceiveContext) error
	// Dequeue fetches a message from the mailbox
	Dequeue() (msg *ReceiveContext)
	// IsEmpty returns true when the mailbox is empty
	IsEmpty() bool
	// Len returns the size of the mailbox
	Len() int64
}

Mailbox defines the actor mailbox. Any implementation should be a thread-safe FIFO

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(sys *actorSystem)
}

Option is the interface that applies a configuration option.

func WithActorInitMaxRetries

func WithActorInitMaxRetries(max int) Option

WithActorInitMaxRetries sets the number of times to retry an actor init process

func WithActorInitTimeout

func WithActorInitTimeout(timeout time.Duration) Option

WithActorInitTimeout sets how long in seconds an actor start timeout

func WithCluster added in v2.1.0

func WithCluster(config *ClusterConfig) Option

WithCluster enables the cluster mode

func WithClustering

func WithClustering(provider discovery.Provider, partitionCount uint64, minimumPeersQuorum uint16, discoveryPort, peersPort int, kinds ...Actor) Option

WithClustering enables the cluster mode. Deprecated: use rather WithCluster which offers a fluent api to set cluster configuration

func WithExpireActorAfter

func WithExpireActorAfter(duration time.Duration) Option

WithExpireActorAfter sets the actor expiry duration. After such duration an idle actor will be expired and removed from the actor system

func WithJanitorInterval added in v2.6.2

func WithJanitorInterval(interval time.Duration) Option

WithJanitorInterval sets the janitor interval

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the actor system custom log

func WithPartitionHasher

func WithPartitionHasher(hasher hash.Hasher) Option

WithPartitionHasher sets the partition hasher.

func WithPassivationDisabled

func WithPassivationDisabled() Option

WithPassivationDisabled disable the passivation mode

func WithPeerStateLoopInterval added in v2.2.1

func WithPeerStateLoopInterval(interval time.Duration) Option

WithPeerStateLoopInterval sets the peer state loop interval

func WithRemoting

func WithRemoting(host string, port int32) Option

WithRemoting enables remoting on the actor system

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) Option

WithShutdownTimeout sets the shutdown timeout The timeout needs to be considerable reasonable based upon the total number of actors the system will probably needs. The graceful timeout is shared amongst all actors and children actors created in the system to graceful shutdown via a cancellation context.

func WithStash

func WithStash() Option

WithStash sets the stash buffer size

func WithSupervisorDirective added in v2.4.0

func WithSupervisorDirective(directive SupervisorDirective) Option

WithSupervisorDirective sets the supervisor strategy directive that will be globally used when there is no supervisor directive sets

type OptionFunc

type OptionFunc func(*actorSystem)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(c *actorSystem)

type PID

type PID struct {
	// contains filtered or unexported fields
}

PID specifies an actor unique process With the PID one can send a ReceiveContext to the actor

var (
	// NoSender means that there is no sender
	NoSender *PID
	// DefaultSupervisoryStrategy defines the default supervisory strategy
	DefaultSupervisoryStrategy = NewStopDirective()
)

func (*PID) Actor added in v2.6.2

func (pid *PID) Actor() Actor

Actor returns the underlying Actor

func (*PID) ActorSystem

func (pid *PID) ActorSystem() ActorSystem

ActorSystem returns the actor system

func (*PID) Address added in v2.6.2

func (pid *PID) Address() *address.Address

Address returns address of the actor

func (*PID) Ask

func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)

Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.

func (*PID) BatchAsk

func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message, err error)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors.

func (*PID) BatchTell

func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error

BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.

func (*PID) Child

func (pid *PID) Child(name string) (*PID, error)

Child returns the named child actor if it is alive

func (*PID) Children

func (pid *PID) Children() []*PID

Children returns the list of all the direct childrenMap of the given actor Only alive actors are included in the list or an empty list is returned

func (*PID) ChildrenCount added in v2.7.0

func (pid *PID) ChildrenCount() int

ChildrenCount returns the total number of childrenMap for the given PID

func (*PID) Equals

func (pid *PID) Equals(to *PID) bool

Equals is a convenient method to compare two PIDs

func (*PID) ID

func (pid *PID) ID() string

ID is a convenient method that returns the actor unique identifier An actor unique identifier is its address in the actor system.

func (*PID) IsRunning

func (pid *PID) IsRunning() bool

IsRunning returns true when the actor is alive ready to process messages and false when the actor is stopped or not started at all

func (*PID) LatestProcessedDuration added in v2.7.0

func (pid *PID) LatestProcessedDuration() time.Duration

LatestProcessedDuration returns the duration of the latest message processed

func (*PID) Logger added in v2.6.2

func (pid *PID) Logger() log.Logger

Logger returns the logger sets when creating the PID

func (*PID) Name added in v2.1.0

func (pid *PID) Name() string

Name returns the actor given name

func (*PID) Parent added in v2.8.2

func (pid *PID) Parent() *PID

Parent returns the parent of this PID

func (*PID) PipeTo added in v2.2.0

func (pid *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error

PipeTo processes a long-started task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-started task is completed

func (*PID) ProcessedCount added in v2.7.0

func (pid *PID) ProcessedCount() int

ProcessedCount returns the total number of messages processed at a given time

func (*PID) RemoteAsk

func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error)

RemoteAsk sends a synchronous message to another actor remotely and expect a response.

func (*PID) RemoteBatchAsk

func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error)

RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.

func (*PID) RemoteBatchTell

func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messages []proto.Message) error

RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.

func (*PID) RemoteLookup

func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)

RemoteLookup look for an actor address on a remote node.

func (*PID) RemoteReSpawn

func (pid *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) error

RemoteReSpawn restarts an actor on a remote node.

func (*PID) RemoteSpawn

func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error

RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem

func (*PID) RemoteStop

func (pid *PID) RemoteStop(ctx context.Context, host string, port int, name string) error

RemoteStop stops an actor on a remote node

func (*PID) RemoteTell

func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error

RemoteTell sends a message to an actor remotely without expecting any reply

func (*PID) Restart

func (pid *PID) Restart(ctx context.Context) error

Restart restarts the actor. During restart all messages that are in the mailbox and not yet processed will be ignored

func (*PID) RestartCount added in v2.7.0

func (pid *PID) RestartCount() int

RestartCount returns the total number of re-starts by the given PID

func (*PID) SendAsync added in v2.6.2

func (pid *PID) SendAsync(ctx context.Context, actorName string, message proto.Message) error

SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.

func (*PID) SendSync added in v2.6.2

func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error)

SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.

func (*PID) Shutdown

func (pid *PID) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.

func (*PID) SpawnChild

func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)

SpawnChild creates a child actor and start watching it for error When the given child actor already exists its PID will only be returned

func (*PID) StashSize

func (pid *PID) StashSize() uint64

StashSize returns the stash buffer size

func (*PID) Stop

func (pid *PID) Stop(ctx context.Context, cid *PID) error

Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped.

func (*PID) Tell

func (pid *PID) Tell(ctx context.Context, to *PID, message proto.Message) error

Tell sends an asynchronous message to another PID

func (*PID) UnWatch

func (pid *PID) UnWatch(cid *PID)

UnWatch stops watching a given actor

func (*PID) Watch

func (pid *PID) Watch(cid *PID)

Watch watches a given actor for a Terminated message when the watched actor shutdown

type PostStopFunc

type PostStopFunc = func(ctx context.Context) error

PostStopFunc defines the PostStopFunc hook for an actor creation

type PreStartFunc

type PreStartFunc = func(ctx context.Context) error

PreStartFunc defines the PreStartFunc hook for an actor creation

type ReceiveContext

type ReceiveContext struct {
	// contains filtered or unexported fields
}

ReceiveContext is the context that is used by the actor to receive messages

func (*ReceiveContext) ActorSystem added in v2.8.2

func (rctx *ReceiveContext) ActorSystem() ActorSystem

ActorSystem returns the actor system

func (*ReceiveContext) Ask

func (rctx *ReceiveContext) Ask(to *PID, message proto.Message, timeout time.Duration) (response proto.Message)

Ask sends a synchronous message to another actor and expect a response. This method is good when interacting with a child actor. Ask has a timeout which can cause the sender to set the context error. When ask times out, the receiving actor does not know and may still process the message. It is recommended to set a good timeout to quickly receive response and try to avoid false positives

func (*ReceiveContext) BatchAsk

func (rctx *ReceiveContext) BatchAsk(to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func (*ReceiveContext) BatchTell

func (rctx *ReceiveContext) BatchTell(to *PID, messages ...proto.Message)

BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.

func (*ReceiveContext) Become

func (rctx *ReceiveContext) Become(behavior Behavior)

Become switch the current behavior of the actor to a new behavior

func (*ReceiveContext) BecomeStacked

func (rctx *ReceiveContext) BecomeStacked(behavior Behavior)

BecomeStacked sets a new behavior to the actor. The current message in process during the transition will still be processed with the current behavior before the transition. However, subsequent messages will be processed with the new behavior. One needs to call UnBecomeStacked to go the next the actor's behavior. which is the default behavior.

func (*ReceiveContext) Child

func (rctx *ReceiveContext) Child(name string) *PID

Child returns the named child actor if it is alive

func (*ReceiveContext) Children

func (rctx *ReceiveContext) Children() []*PID

Children returns the list of all the children of the given actor

func (*ReceiveContext) Context

func (rctx *ReceiveContext) Context() context.Context

Context represents the context attached to the message

func (*ReceiveContext) Err

func (rctx *ReceiveContext) Err(err error)

Err is used instead of panicking within a message handler.

func (*ReceiveContext) Forward

func (rctx *ReceiveContext) Forward(to *PID)

Forward method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This operation does nothing when the receiving actor has not started This method can only be used on a single node actor system because the actor reference PID is needed

func (*ReceiveContext) ForwardTo added in v2.9.0

func (rctx *ReceiveContext) ForwardTo(actorName string)

ForwardTo method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This operation does nothing when the receiving actor has not started This method is only when the actor system is in the cluster mode because it is location transparent.

func (*ReceiveContext) Logger added in v2.8.2

func (rctx *ReceiveContext) Logger() log.Logger

Logger returns the logger used in the actor system

func (*ReceiveContext) Message

func (rctx *ReceiveContext) Message() proto.Message

Message is the actual message sent

func (*ReceiveContext) Parent added in v2.8.2

func (rctx *ReceiveContext) Parent() *PID

Parent returns the parent of the given actor in case the receiving actor is a child actor this is just a convenient method

func (*ReceiveContext) PipeTo added in v2.2.0

func (rctx *ReceiveContext) PipeTo(to *PID, task future.Task)

PipeTo processes a long-running task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-running task is completed

func (*ReceiveContext) RemoteAsk

func (rctx *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any)

RemoteAsk is used to send a message to an actor remotely and expect a response immediately.

func (*ReceiveContext) RemoteBatchAsk

func (rctx *ReceiveContext) RemoteBatchAsk(to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any)

RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.

func (*ReceiveContext) RemoteBatchTell

func (rctx *ReceiveContext) RemoteBatchTell(to *address.Address, messages []proto.Message)

RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.

func (*ReceiveContext) RemoteForward added in v2.9.0

func (rctx *ReceiveContext) RemoteForward(to *address.Address)

RemoteForward method works similarly to the RemoteTell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This method can only be used when remoting is enabled on the running actor system

func (*ReceiveContext) RemoteLookup

func (rctx *ReceiveContext) RemoteLookup(host string, port int, name string) (addr *goaktpb.Address)

RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done using the same actor system as the PID actor system

func (*ReceiveContext) RemoteReSpawn

func (rctx *ReceiveContext) RemoteReSpawn(host string, port int, name string)

RemoteReSpawn restarts an actor on a remote node.

func (*ReceiveContext) RemoteSender

func (rctx *ReceiveContext) RemoteSender() *address.Address

RemoteSender defines the remote sender of the message if it is a remote message This is set to NoSender when the message is not a remote message

func (*ReceiveContext) RemoteTell

func (rctx *ReceiveContext) RemoteTell(to *address.Address, message proto.Message)

RemoteTell sends a message to an actor remotely without expecting any reply

func (*ReceiveContext) Response

func (rctx *ReceiveContext) Response(resp proto.Message)

Response sets the message response

func (*ReceiveContext) Self

func (rctx *ReceiveContext) Self() *PID

Self returns the receiver PID of the message

func (*ReceiveContext) SendAsync added in v2.6.2

func (rctx *ReceiveContext) SendAsync(actorName string, message proto.Message)

SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.

func (*ReceiveContext) SendSync added in v2.6.2

func (rctx *ReceiveContext) SendSync(actorName string, message proto.Message, timeout time.Duration) (response proto.Message)

SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.

func (*ReceiveContext) Sender

func (rctx *ReceiveContext) Sender() *PID

Sender of the message

func (*ReceiveContext) Shutdown

func (rctx *ReceiveContext) Shutdown()

Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.

func (*ReceiveContext) Spawn

func (rctx *ReceiveContext) Spawn(name string, actor Actor, opts ...SpawnOption) *PID

Spawn creates a child actor or return error

func (*ReceiveContext) Stash

func (rctx *ReceiveContext) Stash()

Stash enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior

func (*ReceiveContext) Stop

func (rctx *ReceiveContext) Stop(child *PID)

Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped. However, it returns an error when the child cannot be stopped.

func (*ReceiveContext) Tell

func (rctx *ReceiveContext) Tell(to *PID, message proto.Message)

Tell sends an asynchronous message to another PID

func (*ReceiveContext) UnBecome

func (rctx *ReceiveContext) UnBecome()

UnBecome reset the actor behavior to the default one

func (*ReceiveContext) UnBecomeStacked

func (rctx *ReceiveContext) UnBecomeStacked()

UnBecomeStacked sets the actor behavior to the next behavior before BecomeStacked was called

func (*ReceiveContext) UnWatch added in v2.8.2

func (rctx *ReceiveContext) UnWatch(cid *PID)

UnWatch stops watching a given actor

func (*ReceiveContext) Unhandled

func (rctx *ReceiveContext) Unhandled()

Unhandled is used to handle unhandled messages instead of throwing error

func (*ReceiveContext) Unstash

func (rctx *ReceiveContext) Unstash()

Unstash unstashes the oldest message in the stash and prepends to the mailbox

func (*ReceiveContext) UnstashAll

func (rctx *ReceiveContext) UnstashAll()

UnstashAll unstashes all messages from the stash buffer and prepends in the mailbox it keeps the messages in the same order as received, unstashing older messages before newer

func (*ReceiveContext) Watch added in v2.8.2

func (rctx *ReceiveContext) Watch(cid *PID)

Watch watches a given actor for a Terminated message when the watched actor shutdown

type ReceiveFunc

type ReceiveFunc = func(ctx context.Context, message proto.Message) error

ReceiveFunc is a message handling placeholder

type Remoting added in v2.8.0

type Remoting struct {
	// contains filtered or unexported fields
}

Remoting defines the Remoting APIs This requires Remoting is enabled on the connected actor system

func NewRemoting added in v2.8.0

func NewRemoting() *Remoting

NewRemoting creates an instance Remoting with an insecure connection. To use a secure connection one need to call the WithTLS method of the remoting instance to set the certificates of the secure connection This requires Remoting is enabled on the connected actor system Make sure to call Close to free up resources otherwise you may be leaking socket connections

One can also override the remoting option when calling any of the method for custom one.

func (*Remoting) Client added in v2.8.0

Client returns a Remoting service client instance

func (*Remoting) Close added in v2.8.0

func (r *Remoting) Close()

Close closes the Client connection

func (*Remoting) RemoteAsk added in v2.8.0

func (r *Remoting) RemoteAsk(ctx context.Context, from, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error)

RemoteAsk sends a synchronous message to another actor remotely and expect a response.

func (*Remoting) RemoteBatchAsk added in v2.8.0

func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error)

RemoteBatchAsk sends bulk messages to an actor with responses expected

func (*Remoting) RemoteBatchTell added in v2.8.0

func (r *Remoting) RemoteBatchTell(ctx context.Context, from, to *address.Address, messages []proto.Message) error

RemoteBatchTell sends bulk asynchronous messages to an actor

func (*Remoting) RemoteLookup added in v2.8.0

func (r *Remoting) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *address.Address, err error)

RemoteLookup look for an actor address on a remote node.

func (*Remoting) RemoteReSpawn added in v2.8.0

func (r *Remoting) RemoteReSpawn(ctx context.Context, host string, port int, name string) error

RemoteReSpawn restarts actor on a remote node.

func (*Remoting) RemoteSpawn added in v2.8.0

func (r *Remoting) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error

RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem

func (*Remoting) RemoteStop added in v2.8.0

func (r *Remoting) RemoteStop(ctx context.Context, host string, port int, name string) error

RemoteStop stops an actor on a remote node.

func (*Remoting) RemoteTell added in v2.8.0

func (r *Remoting) RemoteTell(ctx context.Context, from, to *address.Address, message proto.Message) error

RemoteTell sends a message to an actor remotely without expecting any reply

type RestartDirective

type RestartDirective struct {
	// contains filtered or unexported fields
}

RestartDirective defines supervisorQA restart directive

func NewRestartDirective added in v2.4.0

func NewRestartDirective() *RestartDirective

NewRestartDirective creates an instance of RestartDirective

func (*RestartDirective) MaxNumRetries added in v2.4.0

func (x *RestartDirective) MaxNumRetries() uint32

MaxNumRetries returns the max num retries

func (*RestartDirective) Timeout added in v2.4.0

func (x *RestartDirective) Timeout() time.Duration

Timeout returns the timeout

func (*RestartDirective) WithLimit added in v2.4.0

func (x *RestartDirective) WithLimit(maxNumRetries uint32, timeout time.Duration)

WithLimit sets the restart limit

type ResumeDirective added in v2.4.0

type ResumeDirective struct{}

ResumeDirective defines the supervisorQA resume directive This ignores the failure and process the next message, instead

func NewResumeDirective added in v2.4.0

func NewResumeDirective() *ResumeDirective

NewResumeDirective creates an instance of ResumeDirective

type RouterOption added in v2.4.0

type RouterOption interface {
	// Apply sets the Option value of a config.
	Apply(cl *router)
}

RouterOption is the interface that applies a configuration option.

func WithRoutingStrategy added in v2.4.0

func WithRoutingStrategy(strategy RoutingStrategy) RouterOption

WithRoutingStrategy sets the routing strategy

type RouterOptionFunc added in v2.4.0

type RouterOptionFunc func(router *router)

RouterOptionFunc implements the Option interface.

func (RouterOptionFunc) Apply added in v2.4.0

func (f RouterOptionFunc) Apply(c *router)

type RoutingStrategy added in v2.4.0

type RoutingStrategy int

RoutingStrategy defines the routing strategy to use when defining routers

const (
	RoundRobinRouting RoutingStrategy = iota
	RandomRouting
	FanOutRouting
)

type SpawnOption added in v2.7.0

type SpawnOption interface {
	// Apply sets the Option value of a config.
	Apply(config *spawnConfig)
}

SpawnOption is the interface that applies to

func WithMailbox

func WithMailbox(mailbox Mailbox) SpawnOption

WithMailbox sets the mailbox to use when starting the given actor

func WithSupervisor added in v2.8.1

func WithSupervisor(directive SupervisorDirective) SpawnOption

WithSupervisor sets the supervisor directive when starting the given actor.

type StopDirective

type StopDirective struct{}

StopDirective defines the supervisorQA stop directive

func NewStopDirective added in v2.4.0

func NewStopDirective() *StopDirective

NewStopDirective creates an instance of StopDirective

type SupervisorDirective added in v2.4.0

type SupervisorDirective interface {
	// contains filtered or unexported methods
}

SupervisorDirective defines the supervisorQA directive

type UnboundedMailbox added in v2.7.0

type UnboundedMailbox struct {
	// contains filtered or unexported fields
}

UnboundedMailbox is a Multi-Producer-Single-Consumer Queue (FIFO) reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html

func NewUnboundedMailbox added in v2.7.0

func NewUnboundedMailbox() *UnboundedMailbox

NewUnboundedMailbox create an instance of UnboundedMailbox

func (*UnboundedMailbox) Dequeue added in v2.7.0

func (m *UnboundedMailbox) Dequeue() *ReceiveContext

Dequeue takes the mail from the mailbox Returns nil if the queue is empty. Can be used in a single consumer (goroutine) only.

func (*UnboundedMailbox) Enqueue added in v2.7.0

func (m *UnboundedMailbox) Enqueue(value *ReceiveContext) error

Enqueue places the given value in the mailbox

func (*UnboundedMailbox) IsEmpty added in v2.7.0

func (m *UnboundedMailbox) IsEmpty() bool

IsEmpty returns true when the queue is empty

func (*UnboundedMailbox) Len added in v2.7.0

func (m *UnboundedMailbox) Len() int64

Len returns queue length

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL