Documentation ¶
Index ¶
- Constants
- Variables
- func IsBlacklistedActivationError(err error) bool
- func NewBlacklistedActivationError(err error, serverIDs []string) error
- type Actor
- type ActorBytes
- type ActorStream
- type BlacklistedActivationErr
- type CreateActorResult
- type Debug
- type DiscoveryOptions
- type Environment
- func NewDNSRegistryEnvironment(ctx context.Context, host string, port int, opts EnvironmentOptions) (Environment, registry.Registry, error)
- func NewEnvironment(ctx context.Context, serverID string, reg registry.Registry, ...) (Environment, error)
- func NewTestDNSRegistryEnvironment(ctx context.Context, opts EnvironmentOptions) (Environment, registry.Registry, error)
- type EnvironmentOptions
- type HTTPError
- type HostCapabilities
- type InvokeActorResult
- type Module
- type RemoteClient
- type ScheduleInvocationResult
- type Server
Constants ¶
const ( // DiscoveryTypeLocalHost indicates that the environment should advertise its IP // address as localhost to the discovery service. DiscoveryTypeLocalHost = "localhost" // DiscoveryTypeRemote indicates that the environment should advertise its // actual IP to the discovery service. DiscoveryTypeRemote = "remote" )
const DefaultHTTPRequestTimeout = 15 * time.Second
const (
Localhost = "127.0.0.1"
)
Variables ¶
var ( // ErrEnvironmentClosed is an error that indicates the environment is closed. // It can be returned when attempting to perform an operation on a closed environment. ErrEnvironmentClosed = errors.New("environment is closed") DefaultGCActorsAfterDurationWithNoInvocations = time.Minute )
Functions ¶
func IsBlacklistedActivationError ¶
IsBlacklistedActivationError returns a boolean indicating whether the error was caused by the actor being blacklisted from being activated on the server.
func NewBlacklistedActivationError ¶
NewBlacklistedActivationError creates a new BlacklistedActivationErr.
Types ¶
type Actor ¶
type Actor interface { // MemoryUsageBytes returns the estimated amount of memory the actor is using // in terms of bytes. This method will be called after every actor invocation // so its implementation should be efficient. MemoryUsageBytes() int // Close closes the in-memory actor. Close(ctx context.Context) error }
Actor represents an activated actor in memory.
type ActorBytes ¶
type ActorBytes interface { Actor // Invoke invokes the specified operation on the in-memory actor with the provided // payload. The transaction is invocation-specific and will automatically be // committed or rolled back / canceled based on whether Invoke returns an error. Invoke( ctx context.Context, operation string, payload []byte, ) ([]byte, error) }
ActorBytes is the version of Actor that returns responses as a []byte directly.
type ActorStream ¶
type ActorStream interface { Actor InvokeStream( ctx context.Context, operation string, payload []byte, ) (io.ReadCloser, error) }
ActorStream is the same as ByteActor, except it can return responses as streams instead of []byte which is useful in scenarios where large amounts of data need to be shuttled around. It also allows the actor to behave in an "async" manner by return streams and then "filling them in" later.
type BlacklistedActivationErr ¶
type BlacklistedActivationErr struct {
// contains filtered or unexported fields
}
BlacklistedActivationErr indicates that the actor activation has been blacklisted on this specific server temporarily (usually due to resource usage or balancing reasons).
func (BlacklistedActivationErr) Error ¶
func (b BlacklistedActivationErr) Error() string
func (BlacklistedActivationErr) HTTPStatusCode ¶
func (b BlacklistedActivationErr) HTTPStatusCode() int
func (BlacklistedActivationErr) Is ¶
func (b BlacklistedActivationErr) Is(target error) bool
func (BlacklistedActivationErr) ServerIDs ¶
func (b BlacklistedActivationErr) ServerIDs() []string
type CreateActorResult ¶
type CreateActorResult struct { }
type Debug ¶
type Debug interface { // NumActivatedActors returns the number of activated actors in the environment. It is // primarily used for tests. NumActivatedActors() int // Heartbeat forces the environment to heartbeat the Registry immediately. It is primarily // used for tests. Heartbeat() error // contains filtered or unexported methods }
Debug contains private methods that are only used for debugging / tests.
type DiscoveryOptions ¶
type DiscoveryOptions struct { // DiscoveryType is one of DiscoveryTypeLocalHost or DiscoveryTypeRemote. DiscoveryType string // Port is the port that the environment should advertise to the discovery // service. Port int // AllowFailedInitialHeartbeat can be set to true to allow the environment // to instantiate itself even if the initial heartbeat fails. This is useful // to avoid circular startup dependencies when using the leaderregistry // implementation which requires at least one environment to be up and running // to bootstrap the cluster. AllowFailedInitialHeartbeat bool }
DiscoveryOptions contains the discovery-related options.
func (*DiscoveryOptions) Validate ¶
func (d *DiscoveryOptions) Validate() error
type Environment ¶
type Environment interface { Debug // RegisterGoModule registers a new Go module in the environment so it can be used in // subsequent calls. RegisterGoModule can be called at any time, even once the // Environmnt has been in use for a long time. However, the primary reason this method // exists (instead of being an argument provided to the Environment constructor) is // so that applications can write many different packages that all accept an instance // of Environment as a dependency and "register" whatever Go modules they need without // having to register all the Go modules for all the different packages in a single // place. RegisterGoModule(id types.NamespacedIDNoType, module Module) error // InvokeActor invokes the specified operation on the specified actorID with the // provided payload. If the actor is already activated somewhere in the system, // the invocation will be routed appropriately. Otherwise, the request will // activate the actor somewhere in the system and then perform the invocation. InvokeActor( ctx context.Context, namespace string, actorID string, moduleID string, operation string, payload []byte, createIfNotExist types.CreateIfNotExist, ) ([]byte, error) // InvokeActorJSON is the same as InvokeActor, except it implements the functionality // JSON marshaling the request payload and JSON unmarshaling the response payload. InvokeActorJSON( ctx context.Context, namespace string, actorID string, moduleID string, operation string, payload any, createIfNotExist types.CreateIfNotExist, resp any, ) error // InvokeActorStream is the same as InvokeActor, except it uses the streaming // interface instead of returning a []byte directly. This is useful for actors // that need to shuttle large volumes of data around (perhaps in an async manner). InvokeActorStream( ctx context.Context, namespace string, actorID string, moduleID string, operation string, payload []byte, createIfNotExist types.CreateIfNotExist, ) (io.ReadCloser, error) // InvokeActorDirect is the same as InvokeActor, however, it performs the invocation // "directly". // // This method should only be called if the Registry has indicated that the specified // actorID should be activated in this process. If this constraint is violated then // inconsistencies may be introduced into the system. InvokeActorDirect( ctx context.Context, versionStamp int64, serverID string, serverVersion int64, reference types.ActorReferenceVirtual, operation string, payload []byte, createIfNotExist types.CreateIfNotExist, ) ([]byte, error) // InvokeActorDirectStream is the same as InvokeActorDirect, except it uses the streaming // interface instead of returning a []byte directly. This is useful for actors that need // to shuttle large volumes of data around (perhaps in an async manner). InvokeActorDirectStream( ctx context.Context, versionStamp int64, serverID string, serverVersion int64, reference types.ActorReferenceVirtual, operation string, payload []byte, createIfNotExist types.CreateIfNotExist, ) (io.ReadCloser, error) // InvokeWorker invokes the specified operation from the specified module. Unlike // actors, workers provide no guarantees about single-threaded execution or only // a single instance running at a time. This makes them easier to scale than // actors. They're especially useful for large workloads that don't require the // same guarantees actors provide. // // Also keep in mind that actor's can still "accumulate" in-memory state, just like // actors. However, there is no guarantee of linearizability like with Actors so // callers may see "inconsistent" memory state depending on which server/environment // their worker invocation is routed to. InvokeWorker( ctx context.Context, namespace string, moduleID string, operation string, payload []byte, createIfNotExist types.CreateIfNotExist, ) ([]byte, error) // InvokeWorkerStream is the same as InvokeWorker, except it uses the streaming interface // instead of returning a []byte directly. This is useful for actors that need to shuttle // large volumes of data around (perhaps in an async manner). InvokeWorkerStream( ctx context.Context, namespace string, moduleID string, operation string, payload []byte, createIfNotExist types.CreateIfNotExist, ) (io.ReadCloser, error) // Close closes the Environment and all of its associated resources. Close(context.Context) error }
Environment is the interface responsible for routing invocations to the appropriate actor. If the actor is not currently activated in the environment, it will take care of activating it.
func NewDNSRegistryEnvironment ¶
func NewDNSRegistryEnvironment( ctx context.Context, host string, port int, opts EnvironmentOptions, ) (Environment, registry.Registry, error)
NewDNSRegistryEnvironment is a convenience function that creates a virtual environment backed by a DNS-based registry. It is configured with reasonable defaults that make it suitable for production usage. Note that this convenience function is particularly nice because it can also be used for unit/integration tests and local development simply by passing virtual.Localhost as the value of host.
func NewEnvironment ¶
func NewEnvironment( ctx context.Context, serverID string, reg registry.Registry, moduleStore registry.ModuleStore, client RemoteClient, opts EnvironmentOptions, ) (Environment, error)
NewEnvironment creates a new Environment.
func NewTestDNSRegistryEnvironment ¶
func NewTestDNSRegistryEnvironment( ctx context.Context, opts EnvironmentOptions, ) (Environment, registry.Registry, error)
NewTestDNSRegistryEnvironment is a convenience function that creates a virtual environment backed by a DNS-based registry. It is configured already to generate a suitable setting up for writing unit/integration tests, but not for production usage.
type EnvironmentOptions ¶
type EnvironmentOptions struct { // ActivationCacheTTL is the TTL of the activation cache. ActivationCacheTTL time.Duration // DisableActivationCache disables the activation cache. DisableActivationCache bool // Discovery contains the discovery options. Discovery DiscoveryOptions // ForceRemoteProcedureCalls forces the environment to *always* invoke // actors via RPC even if the actor is activated on the node // that originally received the request. ForceRemoteProcedureCalls bool // CustomHostFns contains a set of additional user-defined host // functions that can be exposed to activated actors. This allows // developeres leveraging NOLA as a library to extend the environment // with additional host functionality. CustomHostFns map[string]func([]byte) ([]byte, error) // GCActorsAfterDurationWithNoInvocations is the duration after which an // activated actor that receives no invocations will be GC'd out of memory. // // The actor's shutdown function will be invoked before the actor is GC'd. // // A value of 0 will be ignored and replaced with the default value of // DefaultGCActorsAfterDurationWithNoInvocations. To disable this // functionality entirely, just use a really large value. GCActorsAfterDurationWithNoInvocations time.Duration // MaxNumShutdownWorkers specifies the number of workers used for shutting down the active actors // in the environment. This determines the level of parallelism and CPU resources utilized // during the shutdown process. By default, all available CPUs (runtime.NumCPU()) are used. MaxNumShutdownWorkers int // Logger is a logging instance used for logging messages. // If no logger is provided, the default logger from the slog package (slog.Default()) will be used. Logger *slog.Logger }
EnvironmentOptions is the settings for the Environment.
func (*EnvironmentOptions) Validate ¶
func (e *EnvironmentOptions) Validate() error
type HTTPError ¶
type HTTPError interface {
HTTPStatusCode() int
}
HTTPError is the interface implemented by errors that map to a specific status code. It should be used in conjunction with statusCodeToErrorWrapper so that the status code is automatically set on the server, and the status code is automatically translated back into the appropriate error wrapped by the client.
type HostCapabilities ¶
type HostCapabilities interface { // InvokeActor invokes a function on the specified actor. InvokeActor(context.Context, types.InvokeActorRequest) ([]byte, error) // ScheduleSelfTimer is the same as InvokeActor, except the invocation is scheduled // in memory to be run later on the calling actor, and only if the actor is still // instantiated / activated in-memory when the timer fires. ScheduleSelfTimer(context.Context, wapcutils.ScheduleSelfTimer) error // CustomFn invoke a custom (user defined) host function. This will only work if the // custom host function was registered with the environment when it was instantiated. CustomFn( ctx context.Context, operation string, payload []byte, ) ([]byte, error) }
HostCapabilities defines the interface of capabilities exposed by the host to the Actor.
type InvokeActorResult ¶
type InvokeActorResult struct { }
type Module ¶
type Module interface { // Instantiate instantiates a new in-memory actor from the module. Instantiate( ctx context.Context, reference types.ActorReferenceVirtual, payload []byte, host HostCapabilities, ) (Actor, error) // Close closes the modules. Close(ctx context.Context) error }
Module represents a "module" / template from which new actors are constructed/instantiated.
type RemoteClient ¶
type RemoteClient interface { // InvokeActorRemote is the same as Invoke, however, it performs the actor invocation on a // specific remote server. InvokeActorRemote( ctx context.Context, versionStamp int64, reference types.ActorReference, operation string, payload []byte, create types.CreateIfNotExist, ) (io.ReadCloser, error) }
RemoteClient is the interface implemented by a client that is capable of communicating with remote nodes in the system.
func NewHTTPClient ¶
func NewHTTPClient() RemoteClient
NewHTTPClient returns a new HTTPClient that implements the RemoteClient interface.
type ScheduleInvocationResult ¶
type ScheduleInvocationResult struct { }
type Server ¶
func NewServer ¶
func NewServer( moduleStore registry.ModuleStore, environment Environment, ) *Server
NewServer creates a new server for the actor virtual environment.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
tuple
Package tuple provides a layer for encoding and decoding multi-element tuples into keys usable by FoundationDB.
|
Package tuple provides a layer for encoding and decoding multi-element tuples into keys usable by FoundationDB. |
fdbregistry
Module
|
|