Documentation ¶
Index ¶
- Constants
- Variables
- type ActorDuplicateIdEvent
- type ActorInitializedEvent
- type ActorMaxRestartsExceededEvent
- type ActorRestartedEvent
- type ActorStartedEvent
- type ActorStoppedEvent
- type Context
- func (c *Context) Child(id string) *PID
- func (c *Context) Children() []*PID
- func (c *Context) Context() context.Context
- func (c *Context) Engine() *Engine
- func (c *Context) Forward(pid *PID)
- func (c *Context) GetPID(id string) *PID
- func (c *Context) Message() any
- func (c *Context) PID() *PID
- func (c *Context) Parent() *PID
- func (c *Context) Receiver() Receiver
- func (c *Context) Request(pid *PID, msg any, timeout time.Duration) *Response
- func (c *Context) Respond(msg any)
- func (c *Context) Send(pid *PID, msg any)
- func (c *Context) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater
- func (c *Context) Sender() *PID
- func (c *Context) SpawnChild(p Producer, name string, opts ...OptFunc) *PID
- func (c *Context) SpawnChildFunc(f func(*Context), name string, opts ...OptFunc) *PID
- type DeadLetterEvent
- type Engine
- func (e *Engine) Address() string
- func (e *Engine) BroadcastEvent(msg any)
- func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup
- func (e *Engine) Request(pid *PID, msg any, timeout time.Duration) *Response
- func (e *Engine) Send(pid *PID, msg any)
- func (e *Engine) SendLocal(pid *PID, msg any, sender *PID)
- func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater
- func (e *Engine) SendWithSender(pid *PID, msg any, sender *PID)
- func (e *Engine) Spawn(p Producer, kind string, opts ...OptFunc) *PID
- func (e *Engine) SpawnFunc(f func(*Context), kind string, opts ...OptFunc) *PID
- func (e *Engine) SpawnProc(p Processer) *PID
- func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup
- func (e *Engine) Subscribe(pid *PID)
- func (e *Engine) Unsubscribe(pid *PID)
- type EngineConfig
- type EngineRemoteMissingEvent
- type Envelope
- type EventLogger
- type Inbox
- type Inboxer
- type Initialized
- type InternalError
- type MiddlewareFunc
- type OptFunc
- type Opts
- type PID
- func (pid *PID) Child(id string) *PID
- func (m *PID) CloneMessageVT() proto.Message
- func (m *PID) CloneVT() *PID
- func (*PID) Descriptor() ([]byte, []int)deprecated
- func (this *PID) EqualMessageVT(thatMsg proto.Message) bool
- func (this *PID) EqualVT(that *PID) bool
- func (pid *PID) Equals(other *PID) bool
- func (x *PID) GetAddress() string
- func (x *PID) GetID() string
- func (pid *PID) LookupKey() uint64
- func (m *PID) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *PID) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)
- func (m *PID) MarshalToVT(dAtA []byte) (int, error)
- func (m *PID) MarshalToVTStrict(dAtA []byte) (int, error)
- func (m *PID) MarshalVT() (dAtA []byte, err error)
- func (m *PID) MarshalVTStrict() (dAtA []byte, err error)
- func (*PID) ProtoMessage()
- func (x *PID) ProtoReflect() protoreflect.Message
- func (x *PID) Reset()
- func (m *PID) SizeVT() (n int)
- func (pid *PID) String() string
- func (m *PID) UnmarshalVT(dAtA []byte) error
- type Ping
- func (m *Ping) CloneMessageVT() proto.Message
- func (m *Ping) CloneVT() *Ping
- func (*Ping) Descriptor() ([]byte, []int)deprecated
- func (this *Ping) EqualMessageVT(thatMsg proto.Message) bool
- func (this *Ping) EqualVT(that *Ping) bool
- func (x *Ping) GetFrom() *PID
- func (m *Ping) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *Ping) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)
- func (m *Ping) MarshalToVT(dAtA []byte) (int, error)
- func (m *Ping) MarshalToVTStrict(dAtA []byte) (int, error)
- func (m *Ping) MarshalVT() (dAtA []byte, err error)
- func (m *Ping) MarshalVTStrict() (dAtA []byte, err error)
- func (*Ping) ProtoMessage()
- func (x *Ping) ProtoReflect() protoreflect.Message
- func (x *Ping) Reset()
- func (m *Ping) SizeVT() (n int)
- func (x *Ping) String() string
- func (m *Ping) UnmarshalVT(dAtA []byte) error
- type Pong
- func (m *Pong) CloneMessageVT() proto.Message
- func (m *Pong) CloneVT() *Pong
- func (*Pong) Descriptor() ([]byte, []int)deprecated
- func (this *Pong) EqualMessageVT(thatMsg proto.Message) bool
- func (this *Pong) EqualVT(that *Pong) bool
- func (x *Pong) GetFrom() *PID
- func (m *Pong) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *Pong) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)
- func (m *Pong) MarshalToVT(dAtA []byte) (int, error)
- func (m *Pong) MarshalToVTStrict(dAtA []byte) (int, error)
- func (m *Pong) MarshalVT() (dAtA []byte, err error)
- func (m *Pong) MarshalVTStrict() (dAtA []byte, err error)
- func (*Pong) ProtoMessage()
- func (x *Pong) ProtoReflect() protoreflect.Message
- func (x *Pong) Reset()
- func (m *Pong) SizeVT() (n int)
- func (x *Pong) String() string
- func (m *Pong) UnmarshalVT(dAtA []byte) error
- type Processer
- type Producer
- type ReceiveFunc
- type Receiver
- type Registry
- type RemoteUnreachableEvent
- type Remoter
- type Response
- type Scheduler
- type SendRepeater
- type Started
- type Stopped
Constants ¶
const LocalLookupAddr = "local"
Variables ¶
var ( ErrInvalidLength = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflow = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group") )
var File_actor_actor_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type ActorDuplicateIdEvent ¶
type ActorDuplicateIdEvent struct {
PID *PID
}
ActorDuplicateIdEvent gets published if we try to register the same name twice.
type ActorInitializedEvent ¶
ActorInitializedEvent is broadcasted over the eventStream before an actor received and processed its started event.
type ActorMaxRestartsExceededEvent ¶
ActorMaxRestartsExceededEvent gets created if an actor crashes too many times
type ActorRestartedEvent ¶
type ActorRestartedEvent struct { PID *PID Timestamp time.Time Stacktrace []byte Reason any Restarts int32 }
ActorRestartedEvent is broadcasted when an actor crashes and gets restarted
type ActorStartedEvent ¶
ActorStartedEvent is broadcasted over the eventStream each time a Receiver (Actor) is spawned and activated. This means, that at the point of receiving this event the Receiver (Actor) is ready to process messages.
type ActorStoppedEvent ¶
ActorStoppedEvent is broadcasted over the eventStream each time a process is terminated.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func (*Context) Child ¶
Child will return the PID of the child (if any) by the given name/id. PID will be nil if it could not find it.
func (*Context) Context ¶
Context returns a context.Context, user defined on spawn or a context.Background as default
func (*Context) Forward ¶
Forward will forward the current received message to the given PID. This will also set the "forwarder" as the sender of the message.
func (*Context) GetPID ¶
GetPID returns the PID of the process found by the given id. Returns nil when it could not find any process..
func (*Context) Request ¶
See Engine.Request for information. This is just a helper function doing that calls Request on the underlying Engine. c.Engine().Request().
func (*Context) Respond ¶
Respond will sent the given message to the sender of the current received message.
func (*Context) Send ¶
Send will send the given message to the given PID. This will also set the sender of the message to the PID of the current Context. Hence, the receiver of the message can call Context.Sender() to know the PID of the process that sent this message.
func (*Context) SendRepeat ¶
SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().
func (*Context) Sender ¶
Sender, when available, returns the PID of the process that sent the current received message.
func (*Context) SpawnChild ¶
SpawnChild will spawn the given Producer as a child of the current Context. If the parent process dies, all the children will be automatically shutdown gracefully. Hence, all children will receive the Stopped message.
type DeadLetterEvent ¶
DeadLetterEvent is delivered to the deadletter actor when a message can't be delivered to it's recipient
type Engine ¶
type Engine struct { Registry *Registry // contains filtered or unexported fields }
Engine represents the actor engine.
func NewEngine ¶
func NewEngine(config EngineConfig) (*Engine, error)
NewEngine returns a new actor Engine given an EngineConfig.
func (*Engine) Address ¶
Address returns the address of the actor engine. When there is no remote configured, the "local" address will be used, otherwise the listen address of the remote.
func (*Engine) BroadcastEvent ¶
BroadcastEvent will broadcast the given message over the eventstream, notifying all actors that are subscribed.
func (*Engine) Poison ¶
Poison will send a graceful poisonPill message to the process that is associated with the given PID. The process will shut down gracefully once it has processed all the messages in the inbox. If given a WaitGroup, it blocks till the process is completely shutdown.
func (*Engine) Request ¶
Request sends the given message to the given PID as a "Request", returning a response that will resolve in the future. Calling Response.Result() will block until the deadline is exceeded or the response is being resolved.
func (*Engine) Send ¶
Send sends the given message to the given PID. If the message cannot be delivered due to the fact that the given process is not registered. The message will be sent to the DeadLetter process instead.
func (*Engine) SendLocal ¶
SendLocal will send the given message to the given PID. If the recipient is not found in the registry, the message will be sent to the DeadLetter process instead. If there is no deadletter process registered, the function will panic.
func (*Engine) SendRepeat ¶
SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().
func (*Engine) SendWithSender ¶
SendWithSender will send the given message to the given PID with the given sender. Receivers receiving this message can check the sender by calling Context.Sender().
func (*Engine) Spawn ¶
Spawn spawns a process that will producer by the given Producer and can be configured with the given opts.
func (*Engine) SpawnProc ¶
SpawnProc spawns the give Processer. This function is useful when working with custom created Processes. Take a look at the streamWriter as an example.
func (*Engine) Stop ¶
Stop will send a non-graceful poisonPill message to the process that is associated with the given PID. The process will shut down immediately, once it has processed the poisonPill messsage.
func (*Engine) Unsubscribe ¶
Unsubscribe will un subscribe the given PID from the event stream.
type EngineConfig ¶
type EngineConfig struct {
// contains filtered or unexported fields
}
EngineConfig holds the configuration of the engine.
func NewEngineConfig ¶
func NewEngineConfig() EngineConfig
NewEngineConfig returns a new default EngineConfig.
func (EngineConfig) WithRemote ¶
func (config EngineConfig) WithRemote(remote Remoter) EngineConfig
WithRemote sets the remote which will configure the engine so its capable to send and receive messages over the network.
type EngineRemoteMissingEvent ¶
EngineRemoteMissingEvent gets published if we try to send a message to a remote actor but the remote system is not available.
type EventLogger ¶
EventLogger is an interface that the various Events can choose to implement. If they do, the event stream will log these events to slog.
type Initialized ¶
type Initialized struct{}
type InternalError ¶
type MiddlewareFunc ¶
type MiddlewareFunc = func(ReceiveFunc) ReceiveFunc
type OptFunc ¶
type OptFunc func(*Opts)
func WithContext ¶
func WithInboxSize ¶
func WithMaxRestarts ¶
func WithMiddleware ¶
func WithMiddleware(mw ...MiddlewareFunc) OptFunc
func WithRestartDelay ¶
type Opts ¶
type Opts struct { Producer Producer Kind string ID string MaxRestarts int32 RestartDelay time.Duration InboxSize int Middleware []MiddlewareFunc Context context.Context }
func DefaultOpts ¶
DefaultOpts returns default options from the given Producer.
type PID ¶
type PID struct { Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` ID string `protobuf:"bytes,2,opt,name=ID,proto3" json:"ID,omitempty"` // contains filtered or unexported fields }
func (*PID) CloneMessageVT ¶
func (*PID) Descriptor
deprecated
func (*PID) GetAddress ¶
func (*PID) MarshalToSizedBufferVTStrict ¶
func (*PID) MarshalVTStrict ¶
func (*PID) ProtoMessage ¶
func (*PID) ProtoMessage()
func (*PID) ProtoReflect ¶
func (x *PID) ProtoReflect() protoreflect.Message
func (*PID) UnmarshalVT ¶
type Ping ¶
type Ping struct { From *PID `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"` // contains filtered or unexported fields }
func (*Ping) CloneMessageVT ¶
func (*Ping) Descriptor
deprecated
func (*Ping) MarshalToSizedBufferVT ¶
func (*Ping) MarshalToSizedBufferVTStrict ¶
func (*Ping) MarshalVTStrict ¶
func (*Ping) ProtoMessage ¶
func (*Ping) ProtoMessage()
func (*Ping) ProtoReflect ¶
func (x *Ping) ProtoReflect() protoreflect.Message
func (*Ping) UnmarshalVT ¶
type Pong ¶
type Pong struct { From *PID `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"` // contains filtered or unexported fields }
func (*Pong) CloneMessageVT ¶
func (*Pong) Descriptor
deprecated
func (*Pong) MarshalToSizedBufferVT ¶
func (*Pong) MarshalToSizedBufferVTStrict ¶
func (*Pong) MarshalVTStrict ¶
func (*Pong) ProtoMessage ¶
func (*Pong) ProtoMessage()
func (*Pong) ProtoReflect ¶
func (x *Pong) ProtoReflect() protoreflect.Message
func (*Pong) UnmarshalVT ¶
type Processer ¶
type Processer interface { Start() PID() *PID Send(*PID, any, *PID) Invoke([]Envelope) Shutdown(*sync.WaitGroup) }
Processer is an interface the abstracts the way a process behaves.
type ReceiveFunc ¶
type ReceiveFunc = func(*Context)
type Receiver ¶
type Receiver interface {
Receive(*Context)
}
Receiver is an interface that can receive and process messages.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
type RemoteUnreachableEvent ¶
type RemoteUnreachableEvent struct { // The listen address of the remote we are trying to dial. ListenAddr string }
RemoteUnreachableEvent gets published when trying to send a message to an remote that is not reachable. The event will be published after we retry to dial it N times.
type Remoter ¶
type Remoter interface { Address() string Send(*PID, any, *PID) Start(*Engine) error Stop() *sync.WaitGroup }
Remoter is an interface that abstract a remote that is tied to an engine.
type Scheduler ¶
type Scheduler interface { Schedule(fn func()) Throughput() int }
func NewScheduler ¶
type SendRepeater ¶
type SendRepeater struct {
// contains filtered or unexported fields
}
SendRepeater is a struct that can be used to send a repeating message to a given PID. If you need to have an actor wake up periodically, you can use a SendRepeater. It is started by the SendRepeat method and stopped by it's Stop() method.