actors

package
v0.0.0-...-949823d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2024 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Overview

Copyright 2023 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const InternalActorTypePrefix = "dapr.internal."

Variables

View Source
var (
	ErrIncompatibleStateStore        = errors.New("actor state store does not exist, or does not support transactions which are required to save state - please see https://docs.dapr.io/operations/components/setup-state-store/supported-state-stores/")
	ErrReminderOpActorNotHosted      = errors.New("operations on actor reminders are only possible on hosted actor types")
	ErrTransactionsTooManyOperations = errors.New("the transaction contains more operations than supported by the state store")
	ErrReminderCanceled              = internal.ErrReminderCanceled
)
View Source
var ErrActorDisposed = errors.New("actor is already disposed")

ErrActorDisposed is the error when runtime tries to hold the lock of the disposed actor.

View Source
var ErrMaxStackDepthExceeded = errors.New("maximum stack depth exceeded")

Functions

func DecodeInternalActorData

func DecodeInternalActorData(data io.Reader, e any) error

DecodeInternalActorData decodes encoding/gob data and stores the result in e.

func EncodeInternalActorData

func EncodeInternalActorData(result any) ([]byte, error)

EncodeInternalActorData encodes result using the encoding/gob format.

func NewHostedActors

func NewHostedActors(actorTypes []string) hostedActors

NewHostedActors creates a new hostedActors from a slice of actor types.

func ValidateHostEnvironment

func ValidateHostEnvironment(mTLSEnabled bool, mode modes.DaprMode, namespace string) error

ValidateHostEnvironment validates that actors can be initialized properly given a set of parameters And the mode the runtime is operating in.

Types

type ActorHostedRequest

type ActorHostedRequest struct {
	ActorID   string `json:"actorId"`
	ActorType string `json:"actorType"`
}

ActorHostedRequest is the request object for checking if an actor is hosted on this instance.

func (ActorHostedRequest) ActorKey

func (r ActorHostedRequest) ActorKey() string

ActorKey returns the key of the actor for this request.

type ActorLock

type ActorLock struct {
	// contains filtered or unexported fields
}

func NewActorLock

func NewActorLock(maxStackDepth int32) *ActorLock

func (*ActorLock) Lock

func (a *ActorLock) Lock(requestID *string) error

func (*ActorLock) Unlock

func (a *ActorLock) Unlock()

type ActorRuntime

type ActorRuntime interface {
	Actors
	io.Closer
	Init(context.Context) error
	IsActorHosted(ctx context.Context, req *ActorHostedRequest) bool
	GetRuntimeStatus(ctx context.Context) *runtimev1pb.ActorRuntime
	RegisterInternalActor(ctx context.Context, actorType string, actor InternalActorFactory, actorIdleTimeout time.Duration) error
	Entities() []string
}

ActorRuntime is the main runtime for the actors subsystem.

func NewActors

func NewActors(opts ActorsOpts) (ActorRuntime, error)

NewActors create a new actors runtime with given config.

type Actors

type Actors interface {
	// Call an actor.
	Call(ctx context.Context, req *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error)
	// GetState retrieves actor state.
	GetState(ctx context.Context, req *GetStateRequest) (*StateResponse, error)
	// GetBulkState retrieves actor state in bulk.
	GetBulkState(ctx context.Context, req *GetBulkStateRequest) (BulkStateResponse, error)
	// TransactionalStateOperation performs a transactional state operation with the actor state store.
	TransactionalStateOperation(ctx context.Context, req *TransactionalRequest) error
	// GetReminder retrieves an actor reminder.
	GetReminder(ctx context.Context, req *GetReminderRequest) (*internal.Reminder, error)
	// CreateReminder creates an actor reminder.
	CreateReminder(ctx context.Context, req *CreateReminderRequest) error
	// DeleteReminder deletes an actor reminder.
	DeleteReminder(ctx context.Context, req *DeleteReminderRequest) error
	// CreateTimer creates an actor timer.
	CreateTimer(ctx context.Context, req *CreateTimerRequest) error
	// DeleteTimer deletes an actor timer.
	DeleteTimer(ctx context.Context, req *DeleteTimerRequest) error
	// ExecuteLocalOrRemoteActorReminder executes a reminder on a local or remote actor.
	ExecuteLocalOrRemoteActorReminder(ctx context.Context, reminder *CreateReminderRequest) error
}

Actors allow calling into virtual actors as well as actor state management.

type ActorsOpts

type ActorsOpts struct {
	AppChannel         channel.AppChannel
	GRPCConnectionFn   GRPCConnectionFn
	Config             Config
	TracingSpec        config.TracingSpec
	Resiliency         resiliency.Provider
	StateStoreName     string
	CompStore          *compstore.ComponentStore
	Security           security.Handler
	SchedulerClients   *clients.Clients
	SchedulerReminders bool
	Healthz            healthz.Healthz

	// TODO: @joshvanl Remove in Dapr 1.12 when ActorStateTTL is finalized.
	StateTTLEnabled bool

	// MockPlacement is a placement service implementation used for testing
	MockPlacement internal.PlacementService
}

ActorsOpts contains options for NewActors.

type BulkStateResponse

type BulkStateResponse map[string][]byte

BulkStateResponse is the response returned from getting an actor state in bulk. It's a map where the key is the key of the state, and the value is the value as byte slice.

type Config

type Config struct {
	internal.Config
}

Config is the actor runtime configuration.

func NewConfig

func NewConfig(opts ConfigOpts) Config

NewConfig returns the actor runtime configuration.

func (*Config) GetDrainOngoingTimeoutForType

func (c *Config) GetDrainOngoingTimeoutForType(actorType string) time.Duration

func (*Config) GetDrainRebalancedActorsForType

func (c *Config) GetDrainRebalancedActorsForType(actorType string) bool

func (*Config) GetIdleTimeoutForType

func (c *Config) GetIdleTimeoutForType(actorType string) time.Duration

func (Config) GetPlacementProvider

func (c Config) GetPlacementProvider() (placementProviderFactory, error)

GetPlacementProvider returns the factory method for the configured placement provider

func (*Config) GetReentrancyForType

func (c *Config) GetReentrancyForType(actorType string) daprAppConfig.ReentrancyConfig

func (Config) GetRemindersProvider

func (c Config) GetRemindersProvider(placement internal.PlacementService) (remindersProviderFactory, error)

GetRemindersProvider returns the factory method for the configured reminders provider

type ConfigOpts

type ConfigOpts struct {
	HostAddress       string
	AppID             string
	ActorsService     string
	RemindersService  string
	SchedulerClients  *clients.Clients
	Port              int
	Namespace         string
	AppConfig         daprAppConfig.ApplicationConfig
	HealthHTTPClient  *http.Client
	HealthEndpoint    string
	AppChannelAddress string
	PodName           string
}

ConfigOpts contains options for NewConfig.

type CreateReminderRequest

type CreateReminderRequest = internal.CreateReminderRequest

CreateReminderRequest is the request object to create a new reminder.

type CreateTimerRequest

type CreateTimerRequest = internal.CreateTimerRequest

CreateTimerRequest is the request object to create a new timer.

type DeleteReminderRequest

type DeleteReminderRequest = internal.DeleteReminderRequest

DeleteReminderRequest is the request object for deleting a reminder.

type DeleteStateRequest

type DeleteStateRequest struct {
	ActorID   string `json:"actorId"`
	ActorType string `json:"actorType"`
	Key       string `json:"key"`
}

DeleteStateRequest is the request object for deleting an actor state.

func (DeleteStateRequest) ActorKey

func (r DeleteStateRequest) ActorKey() string

ActorKey returns the key of the actor for this request.

type DeleteTimerRequest

type DeleteTimerRequest = internal.DeleteTimerRequest

DeleteTimerRequest is a request object for deleting a timer.

type GRPCConnectionFn

type GRPCConnectionFn func(ctx context.Context, address string, id string, namespace string, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(destroy bool), error)

GRPCConnectionFn is the type of the function that returns a gRPC connection

type GetBulkStateRequest

type GetBulkStateRequest struct {
	ActorID   string   `json:"actorId"`
	ActorType string   `json:"actorType"`
	Keys      []string `json:"keys"`
}

GetBulkStateRequest is the request object for getting bulk actor state.

func (GetBulkStateRequest) ActorKey

func (r GetBulkStateRequest) ActorKey() string

ActorKey returns the key of the actor for this request.

type GetReminderRequest

type GetReminderRequest = internal.GetReminderRequest

GetReminderRequest is the request object to get an existing reminder.

type GetStateRequest

type GetStateRequest struct {
	ActorID   string `json:"actorId"`
	ActorType string `json:"actorType"`
	Key       string `json:"key"`
}

GetStateRequest is the request object for getting actor state.

func (GetStateRequest) ActorKey

func (r GetStateRequest) ActorKey() string

ActorKey returns the key of the actor for this request.

type InternalActor

type InternalActor interface {
	InvokeMethod(ctx context.Context, methodName string, data []byte, metadata map[string][]string) ([]byte, error)
	DeactivateActor(ctx context.Context) error
	InvokeReminder(ctx context.Context, reminder InternalActorReminder, metadata map[string][]string) error
	InvokeTimer(ctx context.Context, timer InternalActorReminder, metadata map[string][]string) error
	Completed() bool
}

InternalActor represents the interface for invoking an "internal" actor (one which is built into daprd directly).

type InternalActorFactory

type InternalActorFactory = func(actorType string, actorID string, actors Actors) InternalActor

InternalActorFactory is a function that allocates an internal actor.

type InternalActorReminder

type InternalActorReminder struct {
	ActorType string
	ActorID   string
	Name      string
	Data      []byte
	DueTime   string
	Period    string
}

func (InternalActorReminder) DecodeData

func (ir InternalActorReminder) DecodeData(dest any) error

DecodeData decodes internal actor reminder data payloads and stores the result in dest.

func (InternalActorReminder) Key

func (ir InternalActorReminder) Key() string

type OperationType

type OperationType string

OperationType describes a CRUD operation performed against a state store.

const (
	// Upsert is an update or create operation.
	Upsert OperationType = "upsert"
	// Delete is a delete operation.
	Delete OperationType = "delete"
)

type ReminderResponse

type ReminderResponse struct {
	Data    any    `json:"data"`
	DueTime string `json:"dueTime"`
	Period  string `json:"period"`
}

ReminderResponse is the payload that is sent to an Actor SDK API for execution.

func (*ReminderResponse) MarshalJSON

func (r *ReminderResponse) MarshalJSON() ([]byte, error)

MarshalJSON is a custom JSON marshaler that encodes the data as JSON. Actor SDKs expect "data" to be a base64-encoded message with the JSON representation of the data, so this makes sure that happens. This method implements the json.Marshaler interface.

type SaveStateRequest

type SaveStateRequest struct {
	ActorID   string `json:"actorId"`
	ActorType string `json:"actorType"`
	Key       string `json:"key"`
	Value     any    `json:"value"`
}

SaveStateRequest is the request object for saving an actor state.

type StateOperationOpts

type StateOperationOpts struct {
	Metadata    map[string]string
	ContentType *string

	// TODO: @joshvanl Remove in Dapr 1.12 when ActorStateTTL is finalized.
	StateTTLEnabled bool
}

Options for the StateOperation method

type StateResponse

type StateResponse struct {
	Data     []byte            `json:"data"`
	Metadata map[string]string `json:"metadata"`
}

StateResponse is the response returned from getting an actor state.

type TimerResponse

type TimerResponse struct {
	Callback string `json:"callback"`
	Data     any    `json:"data"`
	DueTime  string `json:"dueTime"`
	Period   string `json:"period"`
}

TimerResponse is the response object send to an Actor SDK API when a timer fires.

func (*TimerResponse) MarshalJSON

func (t *TimerResponse) MarshalJSON() ([]byte, error)

MarshalJSON is a custom JSON marshaler that encodes the data as JSON. Actor SDKs expect "data" to be a base64-encoded message with the JSON representation of the data, so this makes sure that happens. This method implements the json.Marshaler interface.

type TransactionalDelete

type TransactionalDelete struct {
	Key  string  `json:"key"`
	ETag *string `json:"etag,omitempty"`
}

TransactionalDelete defined a delete operation.

func (TransactionalDelete) StateOperation

func (t TransactionalDelete) StateOperation(baseKey string, opts StateOperationOpts) (op state.TransactionalStateOperation, err error)

StateOperation returns the state.TransactionalStateOperation object.

type TransactionalOperation

type TransactionalOperation struct {
	Operation OperationType `json:"operation"`
	Request   any           `json:"request"`
}

TransactionalOperation is the request object for a state operation participating in a transaction.

func (TransactionalOperation) StateOperation

func (t TransactionalOperation) StateOperation(baseKey string, opts StateOperationOpts) (op state.TransactionalStateOperation, err error)

StateOperation returns the state.TransactionalStateOperation object.

type TransactionalRequest

type TransactionalRequest struct {
	Operations []TransactionalOperation `json:"operations"`
	ActorType  string
	ActorID    string
}

TransactionalRequest describes a set of stateful operations for a given actor that are performed in a transactional manner.

func (TransactionalRequest) ActorKey

func (r TransactionalRequest) ActorKey() string

ActorKey returns the key of the actor for this request.

type TransactionalUpsert

type TransactionalUpsert struct {
	Key      string            `json:"key"`
	Value    any               `json:"value"`
	ETag     *string           `json:"etag,omitempty"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

TransactionalUpsert defines a key/value pair for an upsert operation.

func (TransactionalUpsert) StateOperation

func (t TransactionalUpsert) StateOperation(baseKey string, opts StateOperationOpts) (op state.TransactionalStateOperation, err error)

StateOperation returns the state.TransactionalStateOperation object.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL