Documentation ¶
Index ¶
- Constants
- Variables
- type ActivationEvent
- type Config
- type Context
- func (c *Context) Child(id string) *PID
- func (c *Context) Children() []*PID
- func (c *Context) Engine() *Engine
- func (c *Context) Forward(pid *PID)
- func (c *Context) GetPID(name string, tags ...string) *PID
- func (c *Context) Message() any
- func (c *Context) PID() *PID
- func (c *Context) Parent() *PID
- func (c *Context) Receiver() Receiver
- func (c *Context) Request(pid *PID, msg any, timeout time.Duration) *Response
- func (c *Context) Respond(msg any)
- func (c *Context) Send(pid *PID, msg any)
- func (c *Context) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater
- func (c *Context) Sender() *PID
- func (c *Context) SpawnChild(p Producer, name string, opts ...OptFunc) *PID
- func (c *Context) SpawnChildFunc(f func(*Context), name string, opts ...OptFunc) *PID
- type DeadLetterEvent
- type Engine
- func (e *Engine) Address() string
- func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup)
- func (e *Engine) Request(pid *PID, msg any, timeout time.Duration) *Response
- func (e *Engine) Send(pid *PID, msg any)
- func (e *Engine) SendLocal(pid *PID, msg any, sender *PID)
- func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater
- func (e *Engine) SendWithSender(pid *PID, msg any, sender *PID)
- func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID
- func (e *Engine) SpawnFunc(f func(*Context), id string, opts ...OptFunc) *PID
- func (e *Engine) SpawnProc(p Processor) *PID
- func (e *Engine) WithRemote(r Remoter)
- type Envelope
- type EventStream
- type EventStreamFunc
- type EventSub
- type Inbox
- type Inboxer
- type Initialized
- type InternalError
- type MiddlewareFunc
- type OptFunc
- type Opts
- type PID
- func (pid *PID) Child(id string, tags ...string) *PID
- func (m *PID) CloneMessageVT() proto.Message
- func (m *PID) CloneVT() *PID
- func (*PID) Descriptor() ([]byte, []int)deprecated
- func (this *PID) EqualMessageVT(thatMsg proto.Message) bool
- func (this *PID) EqualVT(that *PID) bool
- func (pid *PID) Equals(other *PID) bool
- func (x *PID) GetAddress() string
- func (x *PID) GetID() string
- func (pid *PID) HasTag(tag string) bool
- func (pid *PID) LookupKey() uint64
- func (m *PID) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *PID) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error)
- func (m *PID) MarshalToVT(dAtA []byte) (int, error)
- func (m *PID) MarshalToVTStrict(dAtA []byte) (int, error)
- func (m *PID) MarshalVT() (dAtA []byte, err error)
- func (m *PID) MarshalVTStrict() (dAtA []byte, err error)
- func (*PID) ProtoMessage()
- func (x *PID) ProtoReflect() protoreflect.Message
- func (x *PID) Reset()
- func (m *PID) SizeVT() (n int)
- func (pid *PID) String() string
- func (m *PID) UnmarshalVT(dAtA []byte) error
- type Processor
- type Producer
- type ReceiveFunc
- type Receiver
- type Registry
- type Remoter
- type Response
- type SendRepeater
- type Started
- type Stopped
- type TerminationEvent
- type TestReceiveFunc
- type TestReceiver
Constants ¶
const LocalLookupAddr = "local"
Variables ¶
var ( ErrInvalidLength = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflow = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group") )
var File_actor_actor_proto protoreflect.FileDescriptor
var LOCK_OS_THREAD = true
Functions ¶
This section is empty.
Types ¶
type ActivationEvent ¶
type ActivationEvent struct {
PID *PID
}
ActivationEvent is broadcasted over the EventStream each time a Receiver is spawned and activated. This mean at the point of receiving this event the Receiver is ready to process messages.
type Config ¶
type Config struct { // PIDSeparator separates a process ID when printed out // in a string representation. The default separator is "/". // pid := NewPID("127.0.0.1:4000", "foo", "bar") // 127.0.0.1:4000/foo/bar PIDSeparator string }
Config holds configuration for the actor Engine.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func (*Context) Child ¶
Child will return the PID of the child (if any) by the given name/id. PID will be nil if it could not find it.
func (*Context) Forward ¶
Forward will forward the current received message to the given PID. This will also set the "forwarder" as the sender of the message.
func (*Context) GetPID ¶
GetPID returns the PID of the process found by the given name and tags. Returns nil when it could not find any process..
func (*Context) Request ¶
See Engine.Request for information. This is just a helper function doing that calls Request on the underlying Engine. c.Engine().Request().
func (*Context) Respond ¶
Respond will sent the given message to the sender of the current received message.
func (*Context) Send ¶
Send will send the given message to the given PID. This will also set the sender of the message to the PID of the current Context. Hence, the receiver of the message can call Context.Sender() to know the PID of the process that sent this message.
func (*Context) SendRepeat ¶
SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().
func (*Context) Sender ¶
Sender, when available, returns the PID of the process that sent the current received message.
func (*Context) SpawnChild ¶
SpawnChild will spawn the given Producer as a child of the current Context. If the parent process dies, all the children will be automatically shutdown gracefully. Hence, all children will receive the Stopped message.
type DeadLetterEvent ¶
DeadLetterEvent is broadcasted over the EventStream each time a message cannot be delivered to the target PID.
type Engine ¶
type Engine struct { EventStream *EventStream Registry *Registry // contains filtered or unexported fields }
Engine represents the actor engine.
func (*Engine) Address ¶
Address returns the address of the actor engine. When there is no remote configured, the "local" address will be used, otherwise the listen address of the remote.
func (*Engine) Poison ¶
Poison will send a poisonPill to the process that is associated with the given PID. The process will shut down once it processed all its messages before the poisonPill was received. If given a WaitGroup, you can wait till the process is completely shutdown.
func (*Engine) Request ¶
Request sends the given message to the given PID as a "Request", returning a response that will resolve in the future. Calling Response.Result() will block until the deadline is exceeded or the response is being resolved.
func (*Engine) Send ¶
Send sends the given message to the given PID. If the message cannot be delivered due to the fact that the given process is not registered. The message will be send to the DeadLetter process instead.
func (*Engine) SendRepeat ¶
SendRepeat will send the given message to the given PID each given interval. It will return a SendRepeater struct that can stop the repeating message by calling Stop().
func (*Engine) SendWithSender ¶
SendWithSender will send the given message to the given PID with the given sender. Receivers receiving this message can check the sender by calling Context.Sender().
func (*Engine) Spawn ¶
Spawn spawns a process that will producer by the given Producer and can be configured with the given opts.
func (*Engine) SpawnProc ¶
SpawnProc spawns the give Processor. This function is usefull when working with custom created Processes. Take a look at the streamWriter as an example.
func (*Engine) WithRemote ¶
WithRemote returns a new actor Engine with the given Remoter, and will call its Start function
type EventStream ¶
type EventStream struct {
// contains filtered or unexported fields
}
func NewEventStream ¶
func NewEventStream() *EventStream
func (*EventStream) Len ¶
func (e *EventStream) Len() int
func (*EventStream) Publish ¶
func (e *EventStream) Publish(msg any)
func (*EventStream) Subscribe ¶
func (e *EventStream) Subscribe(f EventStreamFunc) *EventSub
func (*EventStream) Unsubscribe ¶
func (e *EventStream) Unsubscribe(sub *EventSub)
type EventStreamFunc ¶
type EventStreamFunc func(event any)
type Initialized ¶
type Initialized struct{}
type InternalError ¶
type MiddlewareFunc ¶
type MiddlewareFunc = func(ReceiveFunc) ReceiveFunc
type OptFunc ¶
type OptFunc func(*Opts)
func WithInboxSize ¶
func WithMaxRestarts ¶
func WithMiddleware ¶
func WithMiddleware(mw ...MiddlewareFunc) OptFunc
func WithRestartDelay ¶
type Opts ¶
type Opts struct { Producer Producer Name string Tags []string MaxRestarts int32 RestartDelay time.Duration InboxSize int Middleware []MiddlewareFunc }
func DefaultOpts ¶
DefaultOpts returns default options from the given Producer.
type PID ¶
type PID struct { Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` ID string `protobuf:"bytes,2,opt,name=ID,proto3" json:"ID,omitempty"` // contains filtered or unexported fields }
func NewPID ¶
NewPID returns a new Process ID given an address, name, and optional tags. TODO(@charlesderek) Can we even optimize this more?
func (*PID) CloneMessageVT ¶
func (*PID) Descriptor
deprecated
func (*PID) GetAddress ¶
func (*PID) MarshalToSizedBufferVTStrict ¶
func (*PID) MarshalVTStrict ¶
func (*PID) ProtoMessage ¶
func (*PID) ProtoMessage()
func (*PID) ProtoReflect ¶
func (x *PID) ProtoReflect() protoreflect.Message
func (*PID) UnmarshalVT ¶
type Processor ¶
type Processor interface { Start() PID() *PID Send(*PID, any, *PID) Invoke([]Envelope) Shutdown(*sync.WaitGroup) }
Processor is an interface the abstracts the way a process behaves.
type Producer ¶
type Producer func() Receiver
Producer is any function that can return a Receiver
func NewTestProducer ¶
func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer
type ReceiveFunc ¶
type ReceiveFunc = func(*Context)
type Receiver ¶
type Receiver interface {
Receive(*Context)
}
Receiver is an interface that can receive and process messages.
type SendRepeater ¶
type SendRepeater struct {
// contains filtered or unexported fields
}
func (SendRepeater) Stop ¶
func (sr SendRepeater) Stop()
type TerminationEvent ¶
type TerminationEvent struct {
PID *PID
}
TerminationEvent is broadcasted over the EventStream each time a process is terminated.
type TestReceiveFunc ¶
type TestReceiver ¶
type TestReceiver struct { OnReceive TestReceiveFunc // contains filtered or unexported fields }
func (*TestReceiver) Receive ¶
func (r *TestReceiver) Receive(ctx *Context)