actors

package
v2.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2025 License: MIT Imports: 59 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// DefaultPassivationTimeout defines the default passivation timeout
	DefaultPassivationTimeout = 2 * time.Minute
	// DefaultInitMaxRetries defines the default value for retrying actor initialization
	DefaultInitMaxRetries = 5
	// DefaultShutdownTimeout defines the default shutdown timeout
	DefaultShutdownTimeout = time.Minute
	// DefaultInitTimeout defines the default init timeout
	DefaultInitTimeout = time.Second
	// DefaultPeerStateLoopInterval defines the default peer state loop interval
	DefaultPeerStateLoopInterval = 10 * time.Second
	// DefaultAskTimeout defines the default ask timeout
	DefaultAskTimeout = 5 * time.Second

	// DefaultJanitorInterval defines the default GC interval
	// This helps cleanup dead actors from the given actor system
	DefaultJanitorInterval = 30 * time.Millisecond
)

Variables

View Source
var (
	// ErrInvalidActorSystemName is returned when the actor system name is invalid
	ErrInvalidActorSystemName = errors.New("invalid ActorSystem name, must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')")
	// ErrDead means that the given actor is not alive
	ErrDead = errors.New("actor is not alive")
	// ErrUnhandled is used when an actor can handle a given message
	ErrUnhandled = errors.New("unhandled message")
	// ErrClusterDisabled is returned when cluster mode is not enabled while accessing cluster features
	ErrClusterDisabled = errors.New("cluster is not enabled")
	// ErrUndefinedActor is returned when an actor is defined
	ErrUndefinedActor = errors.New("actor is not defined")
	// ErrRequestTimeout is returned when sending an Ask message times out
	ErrRequestTimeout = errors.New("request timed out")
	// ErrRemotingDisabled is returned when remoting is not enabled
	ErrRemotingDisabled = errors.New("remoting is not enabled")
	// ErrAddressNotFound is returned when an actor address is not found
	ErrAddressNotFound = func(addr string) error {
		return connect.NewError(connect.CodeNotFound, fmt.Errorf("actor=%s not found", addr))
	}
	// ErrRemoteSendFailure is returned when remote message fails
	ErrRemoteSendFailure = func(err error) error { return connect.NewError(connect.CodeInternal, err) }
	// ErrNameRequired is used when the actor system name is required
	ErrNameRequired = errors.New("actor system is required")
	// ErrInvalidInstance is returned when the creation of an actor instance fails
	ErrInvalidInstance = errors.New("failed to create instance. Reason: invalid instance")
	// ErrActorNotFound is returned when an actor is not found
	ErrActorNotFound = func(actorPath string) error { return fmt.Errorf("actor=%s not found", actorPath) }
	// ErrMethodCallNotAllowed is returned when rpc call is not allowed
	ErrMethodCallNotAllowed = errors.New("method call not allowed")
	// ErrInvalidRemoteMessage is returned when an invalid remote message is sent
	ErrInvalidRemoteMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) }
	// ErrStashBufferNotSet when stashing is not set while requesting for messages to be stashed
	ErrStashBufferNotSet = errors.New("actor is not setup with a stash buffer")
	// ErrInitFailure is returned when the initialization of an actor fails.
	ErrInitFailure = func(err error) error { return fmt.Errorf("failed to initialize: %w", err) }
	// ErrActorSystemNotStarted is returned when the actor is not started while accessing the features of the actor system
	ErrActorSystemNotStarted = errors.New("actor system has not started yet")
	// ErrLocalAddress is returned when a remote address is used instead of a local address
	ErrLocalAddress = errors.New("address is a local address")
	// ErrInstanceNotAnActor is returned when we failed to create the instance of an actor
	ErrInstanceNotAnActor = errors.New("failed to create instance. Reason: instance does not implement the Actor interface")
	// ErrTypeNotRegistered is returned when a given actor is not registered
	ErrTypeNotRegistered = errors.New("actor type is not registered")
	// ErrPeerNotFound is returned when locating a given peer
	ErrPeerNotFound = errors.New("peer is not found")
	// ErrUndefinedTask is returned when piping a long-running task result to an actor
	ErrUndefinedTask = errors.New("task is not defined")
	// ErrInvalidHost is returned when a request is sent to an invalid host
	ErrInvalidHost = errors.New("invalid host")
	// ErrFullMailbox is returned when the mailbox is full
	ErrFullMailbox = errors.New("mailbox is full")
	// ErrSchedulerNotStarted is returned when the scheduler has not started
	ErrSchedulerNotStarted = errors.New("scheduler has not started")
	// ErrInvalidMessage is returned when an invalid remote message is sent
	ErrInvalidMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) }
	// ErrInvalidTimeout is returned when a given timeout is negative or zero
	ErrInvalidTimeout = errors.New("invalid timeout")
	// ErrPriorityMessageRequired is returned when a non-priority message is used in a priority mailbox
	ErrPriorityMessageRequired = errors.New("priority message type is required")
	// ErrActorAlreadyExists is returned when trying to create the same actor more than once
	ErrActorAlreadyExists = func(actorName string) error { return fmt.Errorf("actor=(%s) already exists", actorName) }
	// ErrInvalidTLSConfiguration is returned whent the TLS configuration is not properly set
	ErrInvalidTLSConfiguration = errors.New("TLS configuration is invalid")
)

DefaultSupervisorStrategies defines the default supervisor strategies

Functions

func Ask

func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)

Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.

func BatchAsk

func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...proto.Message) (responses chan proto.Message, err error)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func BatchTell

func BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error

BatchTell sends bulk asynchronous messages to an actor The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func Tell

func Tell(ctx context.Context, to *PID, message proto.Message) error

Tell sends an asynchronous message to an actor

Types

type Actor

type Actor interface {
	// PreStart pre-starts the actor. This function can be used to set up some database connections
	// or some sort of initialization before the actor start processing messages
	// when the initialization failed the actor will not be started.
	// Use this function to set any fields that will be needed before the actor starts.
	// This hook helps set the default values needed by any fields of the actor.
	PreStart(ctx context.Context) error
	// Receive processes any message dropped into the actor mailbox.
	// The receiver of any message can either reply to the sender of the message with a new message or reply to the message synchronously
	// by config the reply of the message. The latter approach is often used when an external service is communicating to the actor.
	// One thing to know is that actor can communicate synchronously as well, just that will hinder the performance of the system.
	Receive(ctx *ReceiveContext)
	// PostStop is executed when the actor is shutting down.
	// The execution happens when every message that have not been processed yet will be processed before the actor shutdowns
	// This help free-up resources
	PostStop(ctx context.Context) error
}

Actor represents the Actor interface This will be implemented by any user who wants to create an actor Any implementation must immutable which means all fields must be private(unexported). Only make use the PreStart hook to set the initial values.

type ActorMetric added in v2.11.1

type ActorMetric struct {
	// contains filtered or unexported fields
}

ActorMetric defines actor specific metrics

func (ActorMetric) ChidrenCount added in v2.11.1

func (x ActorMetric) ChidrenCount() uint64

ChidrenCount returns the total number of child actor given a specific PID

func (ActorMetric) DeadlettersCount added in v2.11.1

func (x ActorMetric) DeadlettersCount() uint64

DeadlettersCount returns the total number of deadletters

func (ActorMetric) LatestProcessedDuration added in v2.11.1

func (x ActorMetric) LatestProcessedDuration() time.Duration

LatestProcessedDuration returns the duration of the latest message processed duration

func (ActorMetric) ProcessedCount added in v2.11.1

func (x ActorMetric) ProcessedCount() uint64

ProcessedCount returns the total number of messages processed at a given time

func (ActorMetric) RestartCount added in v2.11.1

func (x ActorMetric) RestartCount() uint64

RestartCount returns the total number of re-starts by the given PID

func (ActorMetric) StashSize added in v2.11.1

func (x ActorMetric) StashSize() uint64

StashSize returns the stash size at a given time

func (ActorMetric) Uptime added in v2.11.1

func (x ActorMetric) Uptime() int64

Uptime returns the number of seconds since the actor/system started

type ActorRef added in v2.11.1

type ActorRef struct {
	// contains filtered or unexported fields
}

ActorRef defines the information about a given actor. The following information is captured by the ActorRef:

  • Name: The given actor name which is unique both locally and in a cluster environment. Actor's names only word characters that is:[a-zA-Z0-9] plus non-leading '-' or '_'.

  • Kind: The actor kind returns the reflected type of the underlying Actor backing the given actor reference.

  • Address: The actor address. One can use the address with Remoting to interact with the actor by sending messages.

func (ActorRef) Address added in v2.11.1

func (x ActorRef) Address() *address.Address

Address represents the actor address

func (ActorRef) Equals added in v2.11.1

func (x ActorRef) Equals(actor ActorRef) bool

Equals is a convenient method to compare two ActorRef

func (ActorRef) Kind added in v2.11.1

func (x ActorRef) Kind() string

Kind represents the actor kind

func (ActorRef) Name added in v2.11.1

func (x ActorRef) Name() string

Name represents the actor given name

type ActorSystem

type ActorSystem interface {
	// Metric returns the actor system metric.
	// The metric does not include any cluster data
	Metric(ctx context.Context) *Metric
	// Name returns the actor system name
	Name() string
	// Actors returns the list of Actors that are alive on a given running node.
	// This does not account for the total number of actors in the cluster
	Actors() []*PID
	// ActorRefs retrieves a list of active actors, including both local actors
	// and, when cluster mode is enabled, actors across the cluster. Use this
	// method cautiously, as the scanning process may impact system performance.
	// If the cluster request fails, only locally active actors will be returned.
	// The timeout parameter defines the maximum duration for cluster-based requests
	// before they are terminated.
	ActorRefs(ctx context.Context, timeout time.Duration) []ActorRef
	// Start initializes the actor system.
	// To guarantee a clean shutdown during unexpected system terminations,
	// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop.
	Start(ctx context.Context) error
	// Stop stops the actor system and does not terminate the program.
	// One needs to explicitly call os.Exit to terminate the program.
	Stop(ctx context.Context) error
	// Spawn creates an actor in the system and starts it
	Spawn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
	// SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks
	// in the given optional options
	SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
	// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
	// in the given optional options
	SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
	// SpawnRouter creates a new router. One can additionally set the router options.
	// A router is a special type of actor that helps distribute messages of the same type over a set of actors, so that messages can be processed in parallel.
	// A single actor will only process one message at a time.
	SpawnRouter(ctx context.Context, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error)
	// Kill stops a given actor in the system
	Kill(ctx context.Context, name string) error
	// ReSpawn recreates a given actor in the system
	// During restart all messages that are in the mailbox and not yet processed will be ignored.
	// Only the direct alive children of the given actor will be shudown and respawned with their initial state.
	// Bear in mind that restarting an actor will reinitialize the actor to initial state.
	// In case any of the direct child restart fails the given actor will not be started at all.
	ReSpawn(ctx context.Context, name string) (*PID, error)
	// NumActors returns the total number of active actors on a given running node.
	// This does not account for the total number of actors in the cluster
	NumActors() uint64
	// LocalActor returns the reference of a local actor.
	// A local actor is an actor that reside on the same node where the given actor system has started
	LocalActor(actorName string) (*PID, error)
	// RemoteActor returns the address of a remote actor when cluster is enabled
	// When the cluster mode is not enabled an actor not found error will be returned
	// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
	RemoteActor(ctx context.Context, actorName string) (addr *address.Address, err error)
	// ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled
	// When cluster mode is activated, the PID will be nil.
	// When remoting is enabled this method will return and error
	// An actor not found error is return when the actor is not found.
	ActorOf(ctx context.Context, actorName string) (addr *address.Address, pid *PID, err error)
	// InCluster states whether the actor system has started within a cluster of nodes
	InCluster() bool
	// GetPartition returns the partition where a given actor is located
	GetPartition(actorName string) int
	// Subscribe creates an event subscriber to consume events from the actor system.
	// Remember to use the Unsubscribe method to avoid resource leakage.
	Subscribe() (eventstream.Subscriber, error)
	// Unsubscribe unsubscribes a subscriber.
	Unsubscribe(subscriber eventstream.Subscriber) error
	// ScheduleOnce schedules a message that will be delivered to the receiver actor
	// This will send the given message to the actor after the given interval specified.
	// The message will be sent once
	ScheduleOnce(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error
	// Schedule schedules a message that will be delivered to the receiver actor
	// This will send the given message to the actor after the given interval specified.
	Schedule(ctx context.Context, message proto.Message, pid *PID, interval time.Duration) error
	// RemoteScheduleOnce schedules a message to be sent to a remote actor in the future.
	// This requires remoting to be enabled on the actor system.
	// This will send the given message to the actor after the given interval specified
	// The message will be sent once
	RemoteScheduleOnce(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error
	// RemoteSchedule schedules a message to be sent to a remote actor in the future.
	// This requires remoting to be enabled on the actor system.
	// This will send the given message to the actor after the given interval specified
	RemoteSchedule(ctx context.Context, message proto.Message, address *address.Address, interval time.Duration) error
	// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
	ScheduleWithCron(ctx context.Context, message proto.Message, pid *PID, cronExpression string) error
	// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
	RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *address.Address, cronExpression string) error
	// PeerAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
	// This address is empty when cluster mode is not activated
	PeerAddress() string
	// Register registers an actor for future use. This is necessary when creating an actor remotely
	Register(ctx context.Context, actor Actor) error
	// Deregister removes a registered actor from the registry
	Deregister(ctx context.Context, actor Actor) error
	// Logger returns the logger sets when creating the actor system
	Logger() log.Logger
	// Host returns the actor system node host address
	// This is the bind address for remote communication
	Host() string
	// Port returns the actor system node port.
	// This is the bind port for remote communication
	Port() int
	// Uptime returns the number of seconds since the actor system started
	Uptime() int64
	// Running returns true when the actor system is running
	Running() bool
	// Run starts the actor system, blocks on the signals channel, and then
	// gracefully shuts the application down.
	// It's designed to make typical applications simple to run.
	// All of Run's functionality is implemented in terms of the exported
	// Start and Stop methods. Applications with more specialized needs
	// can use those methods directly instead of relying on Run.
	Run(ctx context.Context, startHook func(ctx context.Context) error, stopHook func(ctx context.Context) error)
	// contains filtered or unexported methods
}

ActorSystem defines the contract of an actor system

func NewActorSystem

func NewActorSystem(name string, opts ...Option) (ActorSystem, error)

NewActorSystem creates an instance of ActorSystem

type Behavior

type Behavior func(ctx *ReceiveContext)

Behavior defines an actor behavior

type BoundedMailbox added in v2.7.0

type BoundedMailbox struct {
	// contains filtered or unexported fields
}

BoundedMailbox defines a bounded mailbox using ring buffer queue This mailbox is thread-safe

func NewBoundedMailbox added in v2.7.0

func NewBoundedMailbox(capacity int) *BoundedMailbox

NewBoundedMailbox creates a new instance BoundedMailbox

func (*BoundedMailbox) Dequeue added in v2.7.0

func (mailbox *BoundedMailbox) Dequeue() (msg *ReceiveContext)

Dequeue takes the mail from the mailbox It returns nil when the mailbox is empty

func (*BoundedMailbox) Dispose added in v2.10.1

func (mailbox *BoundedMailbox) Dispose()

Dispose will dispose of this queue and free any blocked threads in the Enqueue and/or Dequeue methods.

func (*BoundedMailbox) Enqueue added in v2.7.0

func (mailbox *BoundedMailbox) Enqueue(msg *ReceiveContext) error

Enqueue places the given value in the mailbox This will return an error when the mailbox is full

func (*BoundedMailbox) IsEmpty added in v2.7.0

func (mailbox *BoundedMailbox) IsEmpty() bool

IsEmpty returns true when the mailbox is empty

func (*BoundedMailbox) Len added in v2.7.0

func (mailbox *BoundedMailbox) Len() int64

Len returns queue length

type ClusterConfig added in v2.1.0

type ClusterConfig struct {
	// contains filtered or unexported fields
}

ClusterConfig defines the cluster mode settings

func NewClusterConfig added in v2.1.0

func NewClusterConfig() *ClusterConfig

NewClusterConfig creates an instance of ClusterConfig

func (*ClusterConfig) Discovery added in v2.1.0

func (x *ClusterConfig) Discovery() discovery.Provider

Discovery returns the discovery provider

func (*ClusterConfig) DiscoveryPort added in v2.6.2

func (x *ClusterConfig) DiscoveryPort() int

DiscoveryPort returns the discovery port

func (*ClusterConfig) KVStoreSize added in v2.12.0

func (x *ClusterConfig) KVStoreSize() uint64

KVStoreSize returns the cluster storage size

func (*ClusterConfig) Kinds added in v2.1.0

func (x *ClusterConfig) Kinds() []Actor

Kinds returns the actor kinds

func (*ClusterConfig) MinimumPeersQuorum added in v2.1.0

func (x *ClusterConfig) MinimumPeersQuorum() uint32

MinimumPeersQuorum returns the minimum peers quorum

func (*ClusterConfig) PartitionCount added in v2.1.0

func (x *ClusterConfig) PartitionCount() uint64

PartitionCount returns the partition count

func (*ClusterConfig) PeersPort added in v2.1.0

func (x *ClusterConfig) PeersPort() int

PeersPort returns the peers port

func (*ClusterConfig) ReadQuorum added in v2.11.0

func (x *ClusterConfig) ReadQuorum() uint32

ReadQuorum returns the read quorum

func (*ClusterConfig) ReplicaCount added in v2.1.0

func (x *ClusterConfig) ReplicaCount() uint32

ReplicaCount returns the replica count.

func (*ClusterConfig) Validate added in v2.1.0

func (x *ClusterConfig) Validate() error

Validate validates the cluster config

func (*ClusterConfig) WAL added in v2.13.0

func (x *ClusterConfig) WAL() string

WAL returns the WAL directory

func (*ClusterConfig) WithDiscovery added in v2.1.0

func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig

WithDiscovery sets the cluster discovery provider

func (*ClusterConfig) WithDiscoveryPort added in v2.6.2

func (x *ClusterConfig) WithDiscoveryPort(port int) *ClusterConfig

WithDiscoveryPort sets the discovery port

func (*ClusterConfig) WithKVStoreSize added in v2.12.0

func (x *ClusterConfig) WithKVStoreSize(size uint64) *ClusterConfig

WithKVStoreSize sets the key/value in-memory storage size The default values is 20MB

func (*ClusterConfig) WithKinds added in v2.1.0

func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig

WithKinds sets the cluster actor kinds

func (*ClusterConfig) WithMinimumPeersQuorum added in v2.1.0

func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig

WithMinimumPeersQuorum sets the cluster config minimum peers quorum

func (*ClusterConfig) WithPartitionCount added in v2.1.0

func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig

WithPartitionCount sets the cluster config partition count Partition cound should be a prime number. ref: https://medium.com/swlh/why-should-the-length-of-your-hash-table-be-a-prime-number-760ec65a75d1

func (*ClusterConfig) WithPeersPort added in v2.1.0

func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig

WithPeersPort sets the peers port

func (*ClusterConfig) WithReadQuorum added in v2.11.0

func (x *ClusterConfig) WithReadQuorum(count uint32) *ClusterConfig

WithReadQuorum sets the read quorum Note: set this field means you have some advanced knowledge on quorum-based replica control The default value should be sufficient for most use cases

func (*ClusterConfig) WithReplicaCount added in v2.1.0

func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig

WithReplicaCount sets the cluster replica count. Note: set this field means you have some advanced knowledge on quorum-based replica control

func (*ClusterConfig) WithWAL added in v2.13.0

func (x *ClusterConfig) WithWAL(dir string) *ClusterConfig

WithWAL sets a custom WAL directory. GoAkt is required to have the permission to create this directory.

func (*ClusterConfig) WithWriteQuorum added in v2.11.0

func (x *ClusterConfig) WithWriteQuorum(count uint32) *ClusterConfig

WithWriteQuorum sets the write quorum Note: set this field means you have some advanced knowledge on quorum-based replica control The default value should be sufficient for most use cases

func (*ClusterConfig) WriteQuorum added in v2.11.0

func (x *ClusterConfig) WriteQuorum() uint32

WriteQuorum returns the write quorum

type FuncOption

type FuncOption interface {
	// Apply sets the Option value of a config.
	Apply(actor *funcConfig)
}

FuncOption is the interface that applies a SpawnHook option.

func WithFuncMailbox added in v2.7.0

func WithFuncMailbox(mailbox Mailbox) FuncOption

WithFuncMailbox sets the mailbox to use when starting the func-based actor

func WithPostStop

func WithPostStop(fn PostStopFunc) FuncOption

WithPostStop defines the PostStopFunc hook

func WithPreStart

func WithPreStart(fn PreStartFunc) FuncOption

WithPreStart defines the PreStartFunc hook

type InternalError added in v2.11.0

type InternalError struct {
	// contains filtered or unexported fields
}

InternalError defines an error that is explicit to the application

func NewInternalError added in v2.11.0

func NewInternalError(err error) InternalError

NewInternalError returns an intance of InternalError

func (InternalError) Error added in v2.11.0

func (i InternalError) Error() string

Error implements the standard error interface

type Mailbox

type Mailbox interface {
	// Enqueue pushes a message into the mailbox. This returns an error
	// when the box is full
	Enqueue(msg *ReceiveContext) error
	// Dequeue fetches a message from the mailbox
	Dequeue() (msg *ReceiveContext)
	// IsEmpty returns true when the mailbox is empty
	IsEmpty() bool
	// Len returns the size of the mailbox
	Len() int64
	// Dispose will dispose of this queue and free any blocked threads
	// in the Enqueue and/or Dequeue methods.
	Dispose()
}

Mailbox defines the actor mailbox. Any implementation should be a thread-safe FIFO

type Metric added in v2.11.1

type Metric struct {
	// contains filtered or unexported fields
}

Metric defines the actor system metric

func (Metric) ActorsCount added in v2.11.1

func (m Metric) ActorsCount() int64

ActorsCount returns the total number of actors either in the system or the total number of child actor given a specific PID

func (Metric) DeadlettersCount added in v2.11.1

func (m Metric) DeadlettersCount() int64

DeadlettersCount returns the total number of deadletters

func (Metric) Uptime added in v2.11.1

func (m Metric) Uptime() int64

Uptime returns the number of seconds since the actor/system started

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(sys *actorSystem)
}

Option is the interface that applies a configuration option.

func WithActorInitMaxRetries

func WithActorInitMaxRetries(value int) Option

WithActorInitMaxRetries sets the number of times to retry an actor init process

func WithActorInitTimeout

func WithActorInitTimeout(timeout time.Duration) Option

WithActorInitTimeout sets how long in seconds an actor start timeout

func WithCluster added in v2.1.0

func WithCluster(config *ClusterConfig) Option

WithCluster enables the cluster mode

func WithCoordinatedShutdown added in v2.11.0

func WithCoordinatedShutdown(hooks ...ShutdownHook) Option

WithCoordinatedShutdown registers internal and user-defined tasks to be executed during the shutdown process. The defined tasks will be executed in the same order of insertion. Any failure will halt the shutdown process.

func WithExpireActorAfter

func WithExpireActorAfter(duration time.Duration) Option

WithExpireActorAfter sets a custom duration after which an idle actor will be passivated. Passivation allows the actor system to free up resources by stopping actors that have been inactive for the specified duration. If the actor receives a message before this timeout, the passivation timer is reset.

func WithJanitorInterval added in v2.6.2

func WithJanitorInterval(interval time.Duration) Option

WithJanitorInterval sets the janitor interval

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the actor system custom log

func WithPartitionHasher

func WithPartitionHasher(hasher hash.Hasher) Option

WithPartitionHasher sets the partition hasher.

func WithPassivationDisabled

func WithPassivationDisabled() Option

WithPassivationDisabled disable the passivation mode

func WithPeerStateLoopInterval added in v2.2.1

func WithPeerStateLoopInterval(interval time.Duration) Option

WithPeerStateLoopInterval sets the peer state loop interval

func WithRemote added in v2.12.0

func WithRemote(config *remote.Config) Option

WithRemote enables remoting on the actor system

func WithRemoting

func WithRemoting(host string, port int32) Option

WithRemoting enables remoting on the actor system Deprecated: use WithRemote which provides better remoting configuration and optimization

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) Option

WithShutdownTimeout sets the shutdown timeout The timeout needs to be considerable reasonable based upon the total number of actors the system will probably needs. The graceful timeout is shared amongst all actors and children actors created in the system to graceful shutdown via a cancellation context.

func WithStash

func WithStash() Option

WithStash sets the stash buffer size

func WithTLS added in v2.12.0

func WithTLS(tlsInfo *TLSInfo) Option

WithTLS configures TLS settings for both the Server and Client. Ensure that both the Server and Client are configured with the same root Certificate Authority (CA) to enable successful handshake and mutual authentication.

In cluster mode, all nodes must share the same root CA to establish secure communication and complete handshakes successfully.

type OptionFunc

type OptionFunc func(*actorSystem)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(c *actorSystem)

type PID

type PID struct {
	// contains filtered or unexported fields
}

PID specifies an actor unique process With the PID one can send a ReceiveContext to the actor

var (

	// NoSender means that there is no sender
	NoSender *PID
	// DefaultSupervisoryStrategy defines the default supervisory strategy
	DefaultSupervisoryStrategy = NewStopDirective()
)

func (*PID) Actor added in v2.6.2

func (pid *PID) Actor() Actor

Actor returns the underlying Actor

func (*PID) ActorSystem

func (pid *PID) ActorSystem() ActorSystem

ActorSystem returns the actor system

func (*PID) Address added in v2.6.2

func (pid *PID) Address() *address.Address

Address returns address of the actor

func (*PID) Ask

func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)

Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.

func (*PID) BatchAsk

func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message, err error)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors.

func (*PID) BatchTell

func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Message) error

BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent. This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.

func (*PID) Child

func (pid *PID) Child(name string) (*PID, error)

Child returns the named child actor if it is alive

func (*PID) Children

func (pid *PID) Children() []*PID

Children returns the list of all the direct descendants of the given actor Only alive actors are included in the list or an empty list is returned

func (*PID) ChildrenCount added in v2.7.0

func (pid *PID) ChildrenCount() int

ChildrenCount returns the total number of childrenMap for the given PID

func (*PID) Equals

func (pid *PID) Equals(to *PID) bool

Equals is a convenient method to compare two PIDs

func (*PID) ID

func (pid *PID) ID() string

ID is a convenient method that returns the actor unique identifier An actor unique identifier is its address in the actor system.

func (*PID) IsRunning

func (pid *PID) IsRunning() bool

IsRunning returns true when the actor is alive ready to process messages and false when the actor is stopped or not started at all

func (*PID) IsSuspended added in v2.10.0

func (pid *PID) IsSuspended() bool

IsSuspended returns true when the actor is suspended A suspended actor is a faulty actor

func (*PID) LatestProcessedDuration added in v2.7.0

func (pid *PID) LatestProcessedDuration() time.Duration

LatestProcessedDuration returns the duration of the latest message processed

func (*PID) Logger added in v2.6.2

func (pid *PID) Logger() log.Logger

Logger returns the logger sets when creating the PID

func (*PID) Metric added in v2.11.1

func (pid *PID) Metric(ctx context.Context) *ActorMetric

Metric returns the actor system metrics. The metric does not include any cluster data

func (*PID) Name added in v2.1.0

func (pid *PID) Name() string

Name returns the actor given name

func (*PID) Parent added in v2.8.2

func (pid *PID) Parent() *PID

Parent returns the parent of this PID

func (*PID) PipeTo added in v2.2.0

func (pid *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error

PipeTo processes a long-started task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-started task is completed

func (*PID) ProcessedCount added in v2.7.0

func (pid *PID) ProcessedCount() int

ProcessedCount returns the total number of messages processed at a given time

func (*PID) RemoteAsk

func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error)

RemoteAsk sends a synchronous message to another actor remotely and expect a response.

func (*PID) RemoteBatchAsk

func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error)

RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.

func (*PID) RemoteBatchTell

func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messages []proto.Message) error

RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.

func (*PID) RemoteLookup

func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *goaktpb.Address, err error)

RemoteLookup look for an actor address on a remote node.

func (*PID) RemoteReSpawn

func (pid *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) error

RemoteReSpawn restarts an actor on a remote node.

func (*PID) RemoteSpawn

func (pid *PID) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error

RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem

func (*PID) RemoteStop

func (pid *PID) RemoteStop(ctx context.Context, host string, port int, name string) error

RemoteStop stops an actor on a remote node

func (*PID) RemoteTell

func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message proto.Message) error

RemoteTell sends a message to an actor remotely without expecting any reply

func (*PID) Restart

func (pid *PID) Restart(ctx context.Context) error

Restart restarts the actor. During restart all messages that are in the mailbox and not yet processed will be ignored. Only the direct alive children of the given actor will be shudown and respawned with their initial state. Bear in mind that restarting an actor will reinitialize the actor to initial state. In case any of the direct child restart fails the given actor will not be started at all.

func (*PID) RestartCount added in v2.7.0

func (pid *PID) RestartCount() int

RestartCount returns the total number of re-starts by the given PID

func (*PID) SendAsync added in v2.6.2

func (pid *PID) SendAsync(ctx context.Context, actorName string, message proto.Message) error

SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.

func (*PID) SendSync added in v2.6.2

func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error)

SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.

func (*PID) Shutdown

func (pid *PID) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.

func (*PID) SpawnChild

func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)

SpawnChild creates a child actor and start watching it for error When the given child actor already exists its PID will only be returned

func (*PID) StashSize

func (pid *PID) StashSize() uint64

StashSize returns the stash buffer size

func (*PID) Stop

func (pid *PID) Stop(ctx context.Context, cid *PID) error

Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped.

func (*PID) Tell

func (pid *PID) Tell(ctx context.Context, to *PID, message proto.Message) error

Tell sends an asynchronous message to another PID

func (*PID) UnWatch

func (pid *PID) UnWatch(cid *PID)

UnWatch stops watching a given actor

func (*PID) Uptime added in v2.10.1

func (pid *PID) Uptime() int64

Uptime returns the number of seconds since the actor started

func (*PID) Watch

func (pid *PID) Watch(cid *PID)

Watch watches a given actor for a Terminated message when the watched actor shutdown

type PanicError added in v2.10.0

type PanicError struct {
	// contains filtered or unexported fields
}

PanicError defines the panic error wrapping the underlying error

func NewPanicError added in v2.10.0

func NewPanicError(err error) PanicError

NewPanicError creates an instance of PanicError

func (PanicError) Error added in v2.10.0

func (e PanicError) Error() string

Error implements the standard error interface

type PostStopFunc

type PostStopFunc = func(ctx context.Context) error

PostStopFunc defines the PostStopFunc hook for an actor creation

type PreStartFunc

type PreStartFunc = func(ctx context.Context) error

PreStartFunc defines the PreStartFunc hook for an actor creation

type PriorityFunc added in v2.10.0

type PriorityFunc func(msg1, msg2 proto.Message) bool

PriorityFunc defines the priority function that will help determines the priority of two messages

type ReceiveContext

type ReceiveContext struct {
	// contains filtered or unexported fields
}

ReceiveContext is the context that is used by the actor to receive messages

func (*ReceiveContext) ActorSystem added in v2.8.2

func (rctx *ReceiveContext) ActorSystem() ActorSystem

ActorSystem returns the actor system

func (*ReceiveContext) Ask

func (rctx *ReceiveContext) Ask(to *PID, message proto.Message, timeout time.Duration) (response proto.Message)

Ask sends a synchronous message to another actor and expect a response. This method is good when interacting with a child actor. Ask has a timeout which can cause the sender to set the context error. When ask times out, the receiving actor does not know and may still process the message. It is recommended to set a good timeout to quickly receive response and try to avoid false positives

func (*ReceiveContext) BatchAsk

func (rctx *ReceiveContext) BatchAsk(to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func (*ReceiveContext) BatchTell

func (rctx *ReceiveContext) BatchTell(to *PID, messages ...proto.Message)

BatchTell sends an asynchronous bunch of messages to the given PID The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors. When BatchTell encounter a single message it will fall back to a Tell call.

func (*ReceiveContext) Become

func (rctx *ReceiveContext) Become(behavior Behavior)

Become switch the current behavior of the actor to a new behavior

func (*ReceiveContext) BecomeStacked

func (rctx *ReceiveContext) BecomeStacked(behavior Behavior)

BecomeStacked sets a new behavior to the actor. The current message in process during the transition will still be processed with the current behavior before the transition. However, subsequent messages will be processed with the new behavior. One needs to call UnBecomeStacked to go the next the actor's behavior. which is the default behavior.

func (*ReceiveContext) Child

func (rctx *ReceiveContext) Child(name string) *PID

Child returns the named child actor if it is alive

func (*ReceiveContext) Children

func (rctx *ReceiveContext) Children() []*PID

Children returns the list of all the children of the given actor

func (*ReceiveContext) Context

func (rctx *ReceiveContext) Context() context.Context

Context represents the context attached to the message

func (*ReceiveContext) Err

func (rctx *ReceiveContext) Err(err error)

Err is used instead of panicking within a message handler.

func (*ReceiveContext) Forward

func (rctx *ReceiveContext) Forward(to *PID)

Forward method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This operation does nothing when the receiving actor has not started This method can only be used on a single node actor system because the actor reference PID is needed

func (*ReceiveContext) ForwardTo added in v2.9.0

func (rctx *ReceiveContext) ForwardTo(actorName string)

ForwardTo method works similarly to the Tell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This operation does nothing when the receiving actor has not started This method is only when the actor system is in the cluster mode because it is location transparent.

func (*ReceiveContext) Logger added in v2.8.2

func (rctx *ReceiveContext) Logger() log.Logger

Logger returns the logger used in the actor system

func (*ReceiveContext) Message

func (rctx *ReceiveContext) Message() proto.Message

Message is the actual message sent

func (*ReceiveContext) PipeTo added in v2.2.0

func (rctx *ReceiveContext) PipeTo(to *PID, task future.Task)

PipeTo processes a long-running task and pipes the result to the provided actor. The successful result of the task will be put onto the provided actor mailbox. This is useful when interacting with external services. It’s common that you would like to use the value of the response in the actor when the long-running task is completed

func (*ReceiveContext) RemoteAsk

func (rctx *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any)

RemoteAsk is used to send a message to an actor remotely and expect a response immediately.

func (*ReceiveContext) RemoteBatchAsk

func (rctx *ReceiveContext) RemoteBatchAsk(to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any)

RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. Messages are processed one after the other in the order they are sent. This can hinder performance if it is not properly used.

func (*ReceiveContext) RemoteBatchTell

func (rctx *ReceiveContext) RemoteBatchTell(to *address.Address, messages []proto.Message)

RemoteBatchTell sends a batch of messages to a remote actor in a way fire-and-forget manner Messages are processed one after the other in the order they are sent.

func (*ReceiveContext) RemoteForward added in v2.9.0

func (rctx *ReceiveContext) RemoteForward(to *address.Address)

RemoteForward method works similarly to the RemoteTell() method except that the sender of a forwarded message is kept as the original sender. As a result, the actor receiving the forwarded messages knows who the actual sender of the message is. The message that is forwarded is the current message received by the received context. This method can only be used when remoting is enabled on the running actor system

func (*ReceiveContext) RemoteLookup

func (rctx *ReceiveContext) RemoteLookup(host string, port int, name string) (addr *goaktpb.Address)

RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done using the same actor system as the PID actor system

func (*ReceiveContext) RemoteReSpawn

func (rctx *ReceiveContext) RemoteReSpawn(host string, port int, name string)

RemoteReSpawn restarts an actor on a remote node.

func (*ReceiveContext) RemoteSender

func (rctx *ReceiveContext) RemoteSender() *address.Address

RemoteSender defines the remote sender of the message if it is a remote message This is set to NoSender when the message is not a remote message

func (*ReceiveContext) RemoteTell

func (rctx *ReceiveContext) RemoteTell(to *address.Address, message proto.Message)

RemoteTell sends a message to an actor remotely without expecting any reply

func (*ReceiveContext) Response

func (rctx *ReceiveContext) Response(resp proto.Message)

Response sets the message response

func (*ReceiveContext) Self

func (rctx *ReceiveContext) Self() *PID

Self returns the receiver PID of the message

func (*ReceiveContext) SendAsync added in v2.6.2

func (rctx *ReceiveContext) SendAsync(actorName string, message proto.Message)

SendAsync sends an asynchronous message to a given actor. The location of the given actor is transparent to the caller.

func (*ReceiveContext) SendSync added in v2.6.2

func (rctx *ReceiveContext) SendSync(actorName string, message proto.Message, timeout time.Duration) (response proto.Message)

SendSync sends a synchronous message to another actor and expect a response. The location of the given actor is transparent to the caller. This block until a response is received or timed out.

func (*ReceiveContext) Sender

func (rctx *ReceiveContext) Sender() *PID

Sender of the message

func (*ReceiveContext) Shutdown

func (rctx *ReceiveContext) Shutdown()

Shutdown gracefully shuts down the given actor All current messages in the mailbox will be processed before the actor shutdown after a period of time that can be configured. All child actors will be gracefully shutdown.

func (*ReceiveContext) Spawn

func (rctx *ReceiveContext) Spawn(name string, actor Actor, opts ...SpawnOption) *PID

Spawn creates a child actor or return error

func (*ReceiveContext) Stash

func (rctx *ReceiveContext) Stash()

Stash enables an actor to temporarily buffer all or some messages that cannot or should not be handled using the actor’s current behavior

func (*ReceiveContext) Stop

func (rctx *ReceiveContext) Stop(child *PID)

Stop forces the child Actor under the given name to terminate after it finishes processing its current message. Nothing happens if child is already stopped. However, it returns an error when the child cannot be stopped.

func (*ReceiveContext) Tell

func (rctx *ReceiveContext) Tell(to *PID, message proto.Message)

Tell sends an asynchronous message to another PID

func (*ReceiveContext) UnBecome

func (rctx *ReceiveContext) UnBecome()

UnBecome reset the actor behavior to the default one

func (*ReceiveContext) UnBecomeStacked

func (rctx *ReceiveContext) UnBecomeStacked()

UnBecomeStacked sets the actor behavior to the next behavior before BecomeStacked was called

func (*ReceiveContext) UnWatch added in v2.8.2

func (rctx *ReceiveContext) UnWatch(cid *PID)

UnWatch stops watching a given actor

func (*ReceiveContext) Unhandled

func (rctx *ReceiveContext) Unhandled()

Unhandled is used to handle unhandled messages instead of throwing error

func (*ReceiveContext) Unstash

func (rctx *ReceiveContext) Unstash()

Unstash unstashes the oldest message in the stash and prepends to the mailbox

func (*ReceiveContext) UnstashAll

func (rctx *ReceiveContext) UnstashAll()

UnstashAll unstashes all messages from the stash buffer and prepends in the mailbox it keeps the messages in the same order as received, unstashing older messages before newer

func (*ReceiveContext) Watch added in v2.8.2

func (rctx *ReceiveContext) Watch(cid *PID)

Watch watches a given actor for a Terminated message when the watched actor shutdown

type ReceiveFunc

type ReceiveFunc = func(ctx context.Context, message proto.Message) error

ReceiveFunc is a message handling placeholder

type Remoting added in v2.8.0

type Remoting struct {
	// contains filtered or unexported fields
}

Remoting defines the Remoting APIs This requires Remoting is enabled on the connected actor system

func NewRemoting added in v2.8.0

func NewRemoting(opts ...RemotingOption) *Remoting

NewRemoting creates an instance Remoting with an insecure connection. To use a secure connection one need to call the WithTLS method of the remoting instance to set the certificates of the secure connection This requires Remoting is enabled on the connected actor system Make sure to call Close to free up resources otherwise you may be leaking socket connections

One can also override the remoting option when calling any of the method for custom one.

func (*Remoting) Close added in v2.8.0

func (r *Remoting) Close()

Close closes the serviceClient connection

func (*Remoting) RemoteAsk added in v2.8.0

func (r *Remoting) RemoteAsk(ctx context.Context, from, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error)

RemoteAsk sends a synchronous message to another actor remotely and expect a response.

func (*Remoting) RemoteBatchAsk added in v2.8.0

func (r *Remoting) RemoteBatchAsk(ctx context.Context, from, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error)

RemoteBatchAsk sends bulk messages to an actor with responses expected

func (*Remoting) RemoteBatchTell added in v2.8.0

func (r *Remoting) RemoteBatchTell(ctx context.Context, from, to *address.Address, messages []proto.Message) error

RemoteBatchTell sends bulk asynchronous messages to an actor

func (*Remoting) RemoteLookup added in v2.8.0

func (r *Remoting) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *address.Address, err error)

RemoteLookup look for an actor address on a remote node.

func (*Remoting) RemoteReSpawn added in v2.8.0

func (r *Remoting) RemoteReSpawn(ctx context.Context, host string, port int, name string) error

RemoteReSpawn restarts actor on a remote node.

func (*Remoting) RemoteSpawn added in v2.8.0

func (r *Remoting) RemoteSpawn(ctx context.Context, host string, port int, name, actorType string) error

RemoteSpawn creates an actor on a remote node. The given actor needs to be registered on the remote node using the Register method of ActorSystem

func (*Remoting) RemoteStop added in v2.8.0

func (r *Remoting) RemoteStop(ctx context.Context, host string, port int, name string) error

RemoteStop stops an actor on a remote node.

func (*Remoting) RemoteTell added in v2.8.0

func (r *Remoting) RemoteTell(ctx context.Context, from, to *address.Address, message proto.Message) error

RemoteTell sends a message to an actor remotely without expecting any reply

type RemotingOption added in v2.12.0

type RemotingOption func(*Remoting)

RemotingOption sets the remoting option

func WithRemotingTLS added in v2.12.0

func WithRemotingTLS(tlsConfig *tls.Config) RemotingOption

WithRemotingTLS configures the remoting system to use a secure connection for communication with the specified remote node. This requires a TLS client configuration to enable secure interactions with the remote actor system.

Ensure that the remote actor system is configured with TLS enabled and capable of completing a successful handshake. It is recommended that both systems share the same root Certificate Authority (CA) for mutual trust and secure communication.

type RestartDirective

type RestartDirective struct {
	// contains filtered or unexported fields
}

RestartDirective defines the supervisor restart directive

func NewRestartDirective added in v2.4.0

func NewRestartDirective() *RestartDirective

NewRestartDirective creates an instance of RestartDirective

func (*RestartDirective) MaxNumRetries added in v2.4.0

func (x *RestartDirective) MaxNumRetries() uint32

MaxNumRetries returns the max num retries

func (*RestartDirective) Timeout added in v2.4.0

func (x *RestartDirective) Timeout() time.Duration

Timeout returns the timeout

func (*RestartDirective) WithLimit added in v2.4.0

func (x *RestartDirective) WithLimit(maxNumRetries uint32, timeout time.Duration)

WithLimit sets the restart limit

type ResumeDirective added in v2.4.0

type ResumeDirective struct{}

ResumeDirective defines the supervisor resume directive This ignores the failure and process the next message, instead

func NewResumeDirective added in v2.4.0

func NewResumeDirective() *ResumeDirective

NewResumeDirective creates an instance of ResumeDirective

type RouterOption added in v2.4.0

type RouterOption interface {
	// Apply sets the Option value of a config.
	Apply(cl *router)
}

RouterOption is the interface that applies a configuration option.

func WithRoutingStrategy added in v2.4.0

func WithRoutingStrategy(strategy RoutingStrategy) RouterOption

WithRoutingStrategy sets the routing strategy

type RouterOptionFunc added in v2.4.0

type RouterOptionFunc func(router *router)

RouterOptionFunc implements the Option interface.

func (RouterOptionFunc) Apply added in v2.4.0

func (f RouterOptionFunc) Apply(c *router)

type RoutingStrategy added in v2.4.0

type RoutingStrategy int

RoutingStrategy defines the routing strategy to use when defining routers

const (
	RoundRobinRouting RoutingStrategy = iota
	RandomRouting
	FanOutRouting
)

type ShutdownHook added in v2.11.0

type ShutdownHook func(ctx context.Context) error

ShutdownHook defines the shutdown hook to be executed alongside the termination of the actor system

type SpawnError added in v2.11.0

type SpawnError struct {
	// contains filtered or unexported fields
}

SpawnError defines an error when re/creating an actor

func NewSpawnError added in v2.11.0

func NewSpawnError(err error) SpawnError

NewSpawnError returns an instance of SpawnError

func (SpawnError) Error added in v2.11.0

func (s SpawnError) Error() string

Error implements the standard error interface

type SpawnOption added in v2.7.0

type SpawnOption interface {
	// Apply sets the Option value of a config.
	Apply(config *spawnConfig)
}

SpawnOption is the interface that applies to

func WithLongLived added in v2.13.0

func WithLongLived() SpawnOption

WithLongLived ensures that the given actor, once created, will persist for the entire lifespan of the running actor system. Unlike short-lived actors that may be restarted or garbage-collected, a long-lived actor remains active until the actor system itself shuts down.

func WithMailbox

func WithMailbox(mailbox Mailbox) SpawnOption

WithMailbox sets the mailbox to use when starting the given actor Care should be taken when using a specific mailbox for a given actor on how to handle messages particularly when it comes to priority mailbox

func WithPassivateAfter added in v2.13.0

func WithPassivateAfter(after time.Duration) SpawnOption

WithPassivateAfter sets a custom duration after which an idle actor will be passivated. Passivation allows the actor system to free up resources by stopping actors that have been inactive for the specified duration. If the actor receives a message before this timeout, the passivation timer is reset.

func WithSupervisorStrategies added in v2.10.0

func WithSupervisorStrategies(supervisorStrategies ...*SupervisorStrategy) SpawnOption

WithSupervisorStrategies defines the supervisor strategies to apply when the given actor fails or panics during its messages processing

type StopDirective

type StopDirective struct{}

StopDirective defines the supervisor stop directive

func NewStopDirective added in v2.4.0

func NewStopDirective() *StopDirective

NewStopDirective creates an instance of StopDirective

type SupervisorDirective added in v2.4.0

type SupervisorDirective interface {
	// contains filtered or unexported methods
}

SupervisorDirective defines the supervisorQA directive

type SupervisorStrategy added in v2.10.0

type SupervisorStrategy struct {
	// contains filtered or unexported fields
}

SupervisorStrategy defines the rules to apply to a faulty actor during message processing

func NewSupervisorStrategy added in v2.10.0

func NewSupervisorStrategy(err error, directive SupervisorDirective) *SupervisorStrategy

NewSupervisorStrategy creates an instance of SupervisorStrategy

func (*SupervisorStrategy) Directive added in v2.10.0

func (s *SupervisorStrategy) Directive() SupervisorDirective

Directive returns the directive of the supervisor strategy

func (*SupervisorStrategy) Error added in v2.10.0

func (s *SupervisorStrategy) Error() error

Kind returns the error type of the supervisor strategy

type TLSInfo added in v2.12.0

type TLSInfo struct {
	// ClientConfig defines the client TLS config
	ClientConfig *tls.Config
	// ServerConfig defines the server TLS config
	ServerConfig *tls.Config
}

TLSInfo defines the TLS settings Ensure that both the Server and Client are configured with the same root Certificate Authority (CA) to enable successful handshake and mutual authentication.

type UnboundedMailbox added in v2.7.0

type UnboundedMailbox struct {
	// contains filtered or unexported fields
}

UnboundedMailbox is a Multi-Producer-Single-Consumer Queue (FIFO) reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html

func NewUnboundedMailbox added in v2.7.0

func NewUnboundedMailbox() *UnboundedMailbox

NewUnboundedMailbox create an instance of UnboundedMailbox

func (*UnboundedMailbox) Dequeue added in v2.7.0

func (m *UnboundedMailbox) Dequeue() *ReceiveContext

Dequeue takes the mail from the mailbox Returns nil if the mailbox is empty. Can be used in a single consumer (goroutine) only.

func (*UnboundedMailbox) Dispose added in v2.10.1

func (m *UnboundedMailbox) Dispose()

Dispose will dispose of this queue and free any blocked threads in the Enqueue and/or Dequeue methods.

func (*UnboundedMailbox) Enqueue added in v2.7.0

func (m *UnboundedMailbox) Enqueue(value *ReceiveContext) error

Enqueue places the given value in the mailbox

func (*UnboundedMailbox) IsEmpty added in v2.7.0

func (m *UnboundedMailbox) IsEmpty() bool

IsEmpty returns true when the mailbox is empty

func (*UnboundedMailbox) Len added in v2.7.0

func (m *UnboundedMailbox) Len() int64

Len returns mailbox length

type UnboundedPriorityMailBox added in v2.10.0

type UnboundedPriorityMailBox struct {
	// contains filtered or unexported fields
}

UnboundedPriorityMailBox is a Priority Queue (FIFO) It implements a binary heap (using the standard library container/heap)

func NewUnboundedPriorityMailBox added in v2.10.0

func NewUnboundedPriorityMailBox(priorityFunc PriorityFunc) *UnboundedPriorityMailBox

NewUnboundedPriorityMailBox creates an instance of UnboundedPriorityMailBox

func (*UnboundedPriorityMailBox) Dequeue added in v2.10.0

func (q *UnboundedPriorityMailBox) Dequeue() (msg *ReceiveContext)

Dequeue takes the mail from the mailbox based upon the priority function defined when initializing the mailbox

func (*UnboundedPriorityMailBox) Dispose added in v2.10.1

func (q *UnboundedPriorityMailBox) Dispose()

Dispose will dispose of this queue and free any blocked threads in the Enqueue and/or Dequeue methods.

func (*UnboundedPriorityMailBox) Enqueue added in v2.10.0

Enqueue places the given value in the mailbox The given message must be a priority message otherwise an error will be returned

func (*UnboundedPriorityMailBox) IsEmpty added in v2.10.0

func (q *UnboundedPriorityMailBox) IsEmpty() bool

IsEmpty returns true when the mailbox is empty

func (*UnboundedPriorityMailBox) Len added in v2.10.0

Len returns mailbox length

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL