Documentation ¶
Overview ¶
Package actor declares the types used to represent actors in the Actor Model.
The actors model provide a high level abstraction for writing concurrent and distributed systems. This approach simplifies the burden imposed on engineers, such as explicit locks and concurrent access to shared state, as actors receive messages synchronously.
The following quote from Wikipedia distills the definition of an actor down to its essence
In response to a message that it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
Creating Actors ¶
Props provide the building blocks for declaring how actors should be created. The following example defines an actor using a function literal to process messages:
var props Props = actor.FromFunc(func(c Context) { // process messages })
Alternatively, a type which conforms to the Actor interface, by defining a single Receive method, can be used.
type MyActor struct {} func (a *MyActor) Receive(c Context) { // process messages } var props Props = actor.FromProducer(func() Actor { return &MyActor{} })
Spawn and SpawnNamed use the given props to create a running instances of an actor. Once spawned, the actor is ready to process incoming messages. To spawn an actor with a unique name, use
pid := actor.Spawn(props)
The result of calling Spawn is a unique PID or process identifier.
Each time an actor is spawned, a new mailbox is created and associated with the PID. Messages are sent to the mailbox and then forwarded to the actor to process.
Processing Messages ¶
An actor processes messages via its Receive handler. The signature of this function is:
Receive(c actor.Context)
The actor system guarantees that this method is called synchronously, therefore there is no requirement to protect shared state inside calls to this function.
Communicating With Actors ¶
A PID is the primary interface for sending messages to actors. The PID.Tell method is used to send an asynchronous message to the actor associated with the PID:
pid.Tell("Hello World")
Depending on the requirements, communication between actors can take place synchronously or asynchronously. Regardless of the circumstances, actors always communicate via a PID.
When sending a message using PID.Request or PID.RequestFuture, the actor which receives the message will respond using the Context.Sender method, which returns the PID of of the sender.
For synchronous communication, an actor will use a Future and wait for the result before continuing. To send a message to an actor and wait for a response, use the RequestFuture method, which returns a Future:
f := actor.RequestFuture(pid,"Hello", 50 * time.Millisecond) res, err := f.Result() // waits for pid to reply
Example ¶
Demonstrates how to create an actor using a function literal and how to send a message asynchronously
package main import ( "fmt" "time" "github.com/aergoio/aergo-actor/actor" ) func main() { var props *actor.Props = actor.FromFunc(func(c actor.Context) { if msg, ok := c.Message().(string); ok { fmt.Println(msg) // outputs "Hello World" } }) pid := actor.Spawn(props) pid.Tell("Hello World") time.Sleep(time.Millisecond * 100) pid.GracefulStop() // wait for the actor to stop }
Output: Hello World
Example (Synchronous) ¶
Demonstrates how to send a message from one actor to another and for the caller to wait for a response before proceeding
package main import ( "fmt" "sync" "github.com/aergoio/aergo-actor/actor" ) func main() { var wg sync.WaitGroup wg.Add(1) // callee will wait for the PING message callee := actor.Spawn(actor.FromFunc(func(c actor.Context) { if msg, ok := c.Message().(string); ok { fmt.Println(msg) // outputs PING c.Respond("PONG") } })) // caller will send a PING message and wait for the PONG caller := actor.Spawn(actor.FromFunc(func(c actor.Context) { switch msg := c.Message().(type) { // the first message an actor receives after it has started case *actor.Started: // send a PING to the callee, and specify the response // is sent to Self, which is this actor's PID c.Request(callee, "PING") case string: fmt.Println(msg) // PONG wg.Done() } })) wg.Wait() callee.GracefulStop() caller.GracefulStop() }
Output: PING PONG
Index ¶
- Variables
- func SetOptions(opts ...optionFn)
- func UnwrapEnvelope(message interface{}) (ReadonlyMessageHeader, interface{}, *PID)
- func WithDeadLetterSubscriber(fn func(evt interface{})) optionFn
- func WithSupervisorSubscriber(fn func(evt interface{})) optionFn
- type Actor
- type ActorFunc
- type AddressResolver
- type AutoReceiveMessage
- type Context
- type DeadLetterEvent
- type DeciderFunc
- type Directive
- type Failure
- type Future
- type InboundMiddleware
- type MessageEnvelope
- type MessageProducer
- type NotInfluenceReceiveTimeout
- type OutboundMiddleware
- type PID
- func (*PID) Descriptor() ([]byte, []int)
- func (this *PID) Equal(that interface{}) bool
- func (m *PID) GetAddress() string
- func (m *PID) GetId() string
- func (pid *PID) GracefulPoison()
- func (pid *PID) GracefulStop()
- func (m *PID) Marshal() (dAtA []byte, err error)
- func (m *PID) MarshalTo(dAtA []byte) (int, error)
- func (pid *PID) MsgNum() int32
- func (pid *PID) Poison()
- func (pid *PID) PoisonFuture() *Future
- func (*PID) ProtoMessage()
- func (pid *PID) Request(message interface{}, respondTo *PID)
- func (pid *PID) RequestFuture(message interface{}, timeout time.Duration) *Future
- func (pid *PID) RequestFuturePrefix(message interface{}, prefix string, timeout time.Duration) *Future
- func (m *PID) Reset()
- func (m *PID) Size() (n int)
- func (pid *PID) Stop()
- func (pid *PID) StopFuture() *Future
- func (pid *PID) String() string
- func (pid *PID) Tell(message interface{})
- func (m *PID) Unmarshal(dAtA []byte) error
- func (m *PID) XXX_DiscardUnknown()
- func (m *PID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *PID) XXX_Merge(src proto.Message)
- func (m *PID) XXX_Size() int
- func (m *PID) XXX_Unmarshal(b []byte) error
- type PIDSet
- func (p *PIDSet) Add(v *PID)
- func (p *PIDSet) Clear()
- func (p *PIDSet) Clone() *PIDSet
- func (p *PIDSet) Contains(v *PID) bool
- func (p *PIDSet) Empty() bool
- func (p *PIDSet) ForEach(f func(i int, pid PID))
- func (p *PIDSet) Len() int
- func (p *PIDSet) Remove(v *PID) bool
- func (p *PIDSet) Values() []PID
- type PoisonPill
- func (*PoisonPill) AutoReceiveMessage()
- func (*PoisonPill) Descriptor() ([]byte, []int)
- func (this *PoisonPill) Equal(that interface{}) bool
- func (m *PoisonPill) Marshal() (dAtA []byte, err error)
- func (m *PoisonPill) MarshalTo(dAtA []byte) (int, error)
- func (*PoisonPill) ProtoMessage()
- func (m *PoisonPill) Reset()
- func (m *PoisonPill) Size() (n int)
- func (this *PoisonPill) String() string
- func (m *PoisonPill) Unmarshal(dAtA []byte) error
- func (m *PoisonPill) XXX_DiscardUnknown()
- func (m *PoisonPill) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *PoisonPill) XXX_Merge(src proto.Message)
- func (m *PoisonPill) XXX_Size() int
- func (m *PoisonPill) XXX_Unmarshal(b []byte) error
- type Process
- type ProcessRegistryValue
- func (pr *ProcessRegistryValue) Add(process Process, id string) (*PID, bool)
- func (pr *ProcessRegistryValue) Get(pid *PID) (Process, bool)
- func (pr *ProcessRegistryValue) GetLocal(id string) (Process, bool)
- func (pr *ProcessRegistryValue) NextId() string
- func (pr *ProcessRegistryValue) RegisterAddressResolver(handler AddressResolver)
- func (pr *ProcessRegistryValue) Remove(pid *PID)
- type Producer
- type Props
- func (props *Props) WithDispatcher(dispatcher mailbox.Dispatcher) *Props
- func (props *Props) WithFunc(f ActorFunc) *Props
- func (props *Props) WithGuardian(guardian SupervisorStrategy) *Props
- func (props *Props) WithInstance(a Actor) *Propsdeprecated
- func (props *Props) WithMailbox(mailbox mailbox.Producer) *Props
- func (props *Props) WithMiddleware(middleware ...InboundMiddleware) *Props
- func (props *Props) WithOutboundMiddleware(middleware ...OutboundMiddleware) *Props
- func (props *Props) WithProducer(p Producer) *Props
- func (props *Props) WithSpawnFunc(spawn SpawnFunc) *Props
- func (props *Props) WithSupervisor(supervisor SupervisorStrategy) *Props
- type ReadonlyMessageHeader
- type ReceiveTimeout
- type Restart
- type RestartStatistics
- type Restarting
- type SenderFunc
- type SpawnFunc
- type Started
- type Stop
- func (*Stop) Descriptor() ([]byte, []int)
- func (this *Stop) Equal(that interface{}) bool
- func (m *Stop) Marshal() (dAtA []byte, err error)
- func (m *Stop) MarshalTo(dAtA []byte) (int, error)
- func (*Stop) ProtoMessage()
- func (m *Stop) Reset()
- func (m *Stop) Size() (n int)
- func (this *Stop) String() string
- func (*Stop) SystemMessage()
- func (m *Stop) Unmarshal(dAtA []byte) error
- func (m *Stop) XXX_DiscardUnknown()
- func (m *Stop) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Stop) XXX_Merge(src proto.Message)
- func (m *Stop) XXX_Size() int
- func (m *Stop) XXX_Unmarshal(b []byte) error
- type Stopped
- type Stopping
- type Supervisor
- type SupervisorEvent
- type SupervisorStrategy
- func DefaultSupervisorStrategy() SupervisorStrategy
- func NewAllForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
- func NewExponentialBackoffStrategy(backoffWindow time.Duration, initialBackoff time.Duration) SupervisorStrategy
- func NewOneForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
- func NewRestartingStrategy() SupervisorStrategy
- func RestartingSupervisorStrategy() SupervisorStrategy
- type SystemMessage
- type Terminated
- func (*Terminated) Descriptor() ([]byte, []int)
- func (this *Terminated) Equal(that interface{}) bool
- func (m *Terminated) GetAddressTerminated() bool
- func (m *Terminated) GetWho() *PID
- func (m *Terminated) Marshal() (dAtA []byte, err error)
- func (m *Terminated) MarshalTo(dAtA []byte) (int, error)
- func (*Terminated) ProtoMessage()
- func (m *Terminated) Reset()
- func (m *Terminated) Size() (n int)
- func (this *Terminated) String() string
- func (*Terminated) SystemMessage()
- func (m *Terminated) Unmarshal(dAtA []byte) error
- func (m *Terminated) XXX_DiscardUnknown()
- func (m *Terminated) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Terminated) XXX_Merge(src proto.Message)
- func (m *Terminated) XXX_Size() int
- func (m *Terminated) XXX_Unmarshal(b []byte) error
- type Unwatch
- func (*Unwatch) Descriptor() ([]byte, []int)
- func (this *Unwatch) Equal(that interface{}) bool
- func (m *Unwatch) GetWatcher() *PID
- func (m *Unwatch) Marshal() (dAtA []byte, err error)
- func (m *Unwatch) MarshalTo(dAtA []byte) (int, error)
- func (*Unwatch) ProtoMessage()
- func (m *Unwatch) Reset()
- func (m *Unwatch) Size() (n int)
- func (this *Unwatch) String() string
- func (*Unwatch) SystemMessage()
- func (m *Unwatch) Unmarshal(dAtA []byte) error
- func (m *Unwatch) XXX_DiscardUnknown()
- func (m *Unwatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Unwatch) XXX_Merge(src proto.Message)
- func (m *Unwatch) XXX_Size() int
- func (m *Unwatch) XXX_Unmarshal(b []byte) error
- type Watch
- func (*Watch) Descriptor() ([]byte, []int)
- func (this *Watch) Equal(that interface{}) bool
- func (m *Watch) GetWatcher() *PID
- func (m *Watch) Marshal() (dAtA []byte, err error)
- func (m *Watch) MarshalTo(dAtA []byte) (int, error)
- func (*Watch) ProtoMessage()
- func (m *Watch) Reset()
- func (m *Watch) Size() (n int)
- func (this *Watch) String() string
- func (*Watch) SystemMessage()
- func (m *Watch) Unmarshal(dAtA []byte) error
- func (m *Watch) XXX_DiscardUnknown()
- func (m *Watch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Watch) XXX_Merge(src proto.Message)
- func (m *Watch) XXX_Size() int
- func (m *Watch) XXX_Unmarshal(b []byte) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthProtos = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowProtos = fmt.Errorf("proto: integer overflow") )
var ErrNameExists = errors.New("spawn: name exists")
ErrNameExists is the error used when an existing name is used for spawning an actor.
var ErrTimeout = errors.New("future: timeout")
ErrTimeout is the error used when a future times out before receiving a result.
var ProcessRegistry = &ProcessRegistryValue{ Address: localAddress, LocalPIDs: cmap.New(), }
ProcessRegistry is a registry of all active processes.
NOTE: This should only be used for advanced scenarios
Functions ¶
func SetOptions ¶
func SetOptions(opts ...optionFn)
SetOptions is used to configure the actor system
func UnwrapEnvelope ¶
func UnwrapEnvelope(message interface{}) (ReadonlyMessageHeader, interface{}, *PID)
func WithDeadLetterSubscriber ¶
func WithDeadLetterSubscriber(fn func(evt interface{})) optionFn
WithDeadLetterSubscriber option replaces the default DeadLetterEvent event subscriber with fn.
fn will only receive *DeadLetterEvent messages
Specifying nil will clear the existing.
func WithSupervisorSubscriber ¶
func WithSupervisorSubscriber(fn func(evt interface{})) optionFn
WithSupervisorSubscriber option replaces the default SupervisorEvent event subscriber with fn.
fn will only receive *SupervisorEvent messages
Specifying nil will clear the existing.
Types ¶
type Actor ¶
type Actor interface {
Receive(c Context)
}
Actor is the interface that defines the Receive method.
Receive is sent messages to be processed from the mailbox associated with the instance of the actor
type ActorFunc ¶
type ActorFunc func(c Context)
The ActorFunc type is an adapter to allow the use of ordinary functions as actors to process messages
type AddressResolver ¶
An AddressResolver is used to resolve remote actors
type AutoReceiveMessage ¶
type AutoReceiveMessage interface {
AutoReceiveMessage()
}
An AutoReceiveMessage is a special kind of user message that will be handled in some way automatially by the actor
type Context ¶
type Context interface { // Watch registers the actor as a monitor for the specified PID Watch(pid *PID) // Unwatch unregisters the actor as a monitor for the specified PID Unwatch(pid *PID) // Message returns the current message to be processed Message() interface{} // Sender returns the PID of actor that sent currently processed message Sender() *PID //MessageHeader returns the meta information for the currently processed message MessageHeader() ReadonlyMessageHeader //Tell sends a message to the given PID Tell(pid *PID, message interface{}) //Forward forwards current message to the given PID Forward(pid *PID) //Request sends a message to the given PID and also provides a Sender PID Request(pid *PID, message interface{}) // RequestFuture sends a message to a given PID and returns a Future RequestFuture(pid *PID, message interface{}, timeout time.Duration) *Future // SetReceiveTimeout sets the inactivity timeout, after which a ReceiveTimeout message will be sent to the actor. // A duration of less than 1ms will disable the inactivity timer. // // If a message is received before the duration d, the timer will be reset. If the message conforms to // the NotInfluenceReceiveTimeout interface, the timer will not be reset SetReceiveTimeout(d time.Duration) // ReceiveTimeout returns the current timeout ReceiveTimeout() time.Duration // SetBehavior replaces the actors current behavior stack with the new behavior SetBehavior(behavior ActorFunc) // PushBehavior pushes the current behavior on the stack and sets the current Receive handler to the new behavior PushBehavior(behavior ActorFunc) // PopBehavior reverts to the previous Receive handler PopBehavior() // Self returns the PID for the current actor Self() *PID // Parent returns the PID for the current actors parent Parent() *PID // Spawn starts a new child actor based on props and named with a unique id Spawn(props *Props) *PID // SpawnPrefix starts a new child actor based on props and named using a prefix followed by a unique id SpawnPrefix(props *Props, prefix string) *PID // SpawnNamed starts a new child actor based on props and named using the specified name // // ErrNameExists will be returned if id already exists SpawnNamed(props *Props, id string) (*PID, error) // Returns a slice of the actors children Children() []*PID // Stash stashes the current message on a stack for reprocessing when the actor restarts Stash() // Respond sends a response to the to the current `Sender` // // If the Sender is nil, the actor will panic Respond(response interface{}) // Actor returns the actor associated with this context Actor() Actor AwaitFuture(f *Future, continuation func(res interface{}, err error)) }
Context contains contextual information for actors
Example (SetBehavior) ¶
SetBehavior allows an actor to change its Receive handler, providing basic support for state machines
package main import ( "fmt" "sync" "github.com/aergoio/aergo-actor/actor" ) type setBehaviorActor struct { *sync.WaitGroup } // Receive is the default message handler when an actor is started func (f *setBehaviorActor) Receive(context actor.Context) { if msg, ok := context.Message().(string); ok && msg == "other" { // Change actor's receive message handler to Other context.SetBehavior(f.Other) } } func (f *setBehaviorActor) Other(context actor.Context) { if msg, ok := context.Message().(string); ok && msg == "foo" { fmt.Println(msg) f.Done() } } // SetBehavior allows an actor to change its Receive handler, providing basic support for state machines func main() { wg := &sync.WaitGroup{} wg.Add(1) pid := actor.Spawn(actor.FromProducer(func() actor.Actor { return &setBehaviorActor{wg} })) defer pid.GracefulStop() pid.Tell("other") pid.Tell("foo") wg.Wait() }
Output: foo
Example (SetReceiveTimeout) ¶
SetReceiveTimeout allows an actor to be notified repeatedly if it does not receive any messages for a specified duration
package main import ( "fmt" "sync" "time" "github.com/aergoio/aergo-actor/actor" ) type setReceiveTimeoutActor struct { *sync.WaitGroup } // Receive is the default message handler when an actor is started func (f *setReceiveTimeoutActor) Receive(context actor.Context) { switch context.Message().(type) { case *actor.Started: // when the actor starts, set the receive timeout to 10 milliseconds. // // the system will send an *actor.ReceiveTimeout message every time the timeout // expires until SetReceiveTimeout is called again with a duration < 1 ms context.SetReceiveTimeout(10 * time.Millisecond) case *actor.ReceiveTimeout: fmt.Println("timed out") f.Done() } } // SetReceiveTimeout allows an actor to be notified repeatedly if it does not receive any messages // for a specified duration func main() { wg := &sync.WaitGroup{} wg.Add(1) pid := actor.Spawn(actor.FromProducer(func() actor.Actor { return &setReceiveTimeoutActor{wg} })) defer pid.GracefulStop() wg.Wait() // wait for the ReceiveTimeout message }
Output: timed out
type DeadLetterEvent ¶
type DeadLetterEvent struct { PID *PID // The invalid process, to which the message was sent Message interface{} // The message that could not be delivered Sender *PID // the process that sent the Message }
A DeadLetterEvent is published via event.Publish when a message is sent to a nonexistent PID
type DeciderFunc ¶
type DeciderFunc func(reason interface{}) Directive
DeciderFunc is a function which is called by a SupervisorStrategy
type Directive ¶
type Directive int
Directive is an enum for supervision actions
const ( // ResumeDirective instructs the supervisor to resume the actor and continue processing messages ResumeDirective Directive = iota // RestartDirective instructs the supervisor to discard the actor, replacing it with a new instance, // before processing additional messages RestartDirective // StopDirective instructs the supervisor to stop the actor StopDirective // EscalateDirective instructs the supervisor to escalate handling of the failure to the actor's parent supervisor EscalateDirective )
Directive determines how a supervisor should handle a faulting actor
func DefaultDecider ¶
func DefaultDecider(_ interface{}) Directive
DefaultDecider is a decider that will always restart the failing child actor
type Failure ¶
type Failure struct { Who *PID Reason interface{} RestartStats *RestartStatistics Message interface{} }
func (*Failure) SystemMessage ¶
func (*Failure) SystemMessage()
type Future ¶
type Future struct {
// contains filtered or unexported fields
}
func NewFuturePrefix ¶
NewFuturePrefix creates and returns a new actor.Future with a timeout of duration d and this generates process id using a given prefix string
func (*Future) PipeTo ¶
PipeTo forwards the result or error of the future to the specified pids
Example ¶
package main import ( "fmt" "sync" "time" "github.com/aergoio/aergo-actor/actor" ) func main() { var wg sync.WaitGroup wg.Add(1) // test actor that will be the target of the future PipeTo pid := actor.Spawn(actor.FromFunc(func(ctx actor.Context) { // check if the message is a string and therefore // the "hello world" message piped from the future if m, ok := ctx.Message().(string); ok { fmt.Println(m) wg.Done() } })) f := actor.NewFuture(50 * time.Millisecond) f.PipeTo(pid) // resolve the future and pipe to waiting actor f.PID().Tell("hello world") wg.Wait() }
Output: hello world
type InboundMiddleware ¶
type MessageEnvelope ¶
type MessageEnvelope struct { Header messageHeader Message interface{} Sender *PID }
func (*MessageEnvelope) GetHeader ¶
func (me *MessageEnvelope) GetHeader(key string) string
func (*MessageEnvelope) SetHeader ¶
func (me *MessageEnvelope) SetHeader(key string, value string)
type MessageProducer ¶
type MessageProducer interface { // Tell sends a messages asynchronously to the PID Tell(pid *PID, message interface{}) // Request sends a messages asynchronously to the PID. The actor may send a response back via respondTo, which is // available to the receiving actor via Context.Sender Request(pid *PID, message interface{}, respondTo *PID) // RequestFuture sends a message to a given PID and returns a Future RequestFuture(pid *PID, message interface{}, timeout time.Duration) *Future }
var (
EmptyContext MessageProducer = &rootMessageProducer{}
)
type NotInfluenceReceiveTimeout ¶
type NotInfluenceReceiveTimeout interface {
NotInfluenceReceiveTimeout()
}
NotInfluenceReceiveTimeout messages will not reset the ReceiveTimeout timer of an actor that receives the message
type OutboundMiddleware ¶
type OutboundMiddleware func(next SenderFunc) SenderFunc
type PID ¶
type PID struct { Address string `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"` Id string `protobuf:"bytes,2,opt,name=Id,proto3" json:"Id,omitempty"` // contains filtered or unexported fields }
func NewLocalPID ¶
NewLocalPID returns a new instance of the PID struct with the address preset
func Spawn ¶
Spawn starts a new actor based on props and named with a unique id
Example ¶
Spawn creates instances of actors, similar to 'new' or 'make' but for actors.
package main import ( "fmt" "sync" "github.com/aergoio/aergo-actor/actor" ) func main() { var wg sync.WaitGroup wg.Add(1) //define the actor props //props define the creation process of an actor props := actor.FromFunc(func(ctx actor.Context) { // check if the message is a *actor.Started message // this is the first message all actors get // actor.Started is received async and can be used // to initialize your actors initial state if _, ok := ctx.Message().(*actor.Started); ok { fmt.Println("hello world") wg.Done() } }) // spawn the actor based on the props actor.Spawn(props) wg.Wait() }
Output: hello world
func SpawnNamed ¶
SpawnNamed starts a new actor based on props and named using the specified name
If name exists, error will be ErrNameExists
Example ¶
Spawn creates instances of actors, similar to 'new' or 'make' but for actors.
package main import ( "fmt" "log" "sync" "github.com/aergoio/aergo-actor/actor" ) func main() { var wg sync.WaitGroup wg.Add(1) //define the actor props //props define the creation process of an actor props := actor.FromFunc(func(ctx actor.Context) { // check if the message is a *actor.Started message // this is the first message all actors get // actor.Started is received async and can be used // to initialize your actors initial state if _, ok := ctx.Message().(*actor.Started); ok { fmt.Println("hello world") wg.Done() } }) // spawn the actor based on the props _, err := actor.SpawnNamed(props, "my-actor") if err != nil { log.Fatal("The actor name is already in use") } wg.Wait() }
Output: hello world
func SpawnPrefix ¶
SpawnPrefix starts a new actor based on props and named using a prefix followed by a unique id
func (*PID) Descriptor ¶
func (*PID) GetAddress ¶
func (*PID) GracefulPoison ¶
func (pid *PID) GracefulPoison()
GracefulPoison will tell and wait actor to stop after processing current user messages in mailbox.
func (*PID) GracefulStop ¶
func (pid *PID) GracefulStop()
GracefulStop will wait actor to stop immediately regardless of existing user messages in mailbox
func (*PID) Poison ¶
func (pid *PID) Poison()
Poison will tell actor to stop after processing current user messages in mailbox.
func (*PID) PoisonFuture ¶
PoisonFuture will tell actor to stop after processing current user messages in mailbox, and return its future.
func (*PID) ProtoMessage ¶
func (*PID) ProtoMessage()
func (*PID) Request ¶
Request sends a messages asynchronously to the PID. The actor may send a response back via respondTo, which is available to the receiving actor via Context.Sender
func (*PID) RequestFuture ¶
RequestFuture sends a message to a given PID and returns a Future
func (*PID) RequestFuturePrefix ¶
func (pid *PID) RequestFuturePrefix(message interface{}, prefix string, timeout time.Duration) *Future
RequestFuturePrefix sends a message to a given PID and returns a Future and this internally uses the prefix to generate sender's pid for an ease of debugging
func (*PID) Stop ¶
func (pid *PID) Stop()
Stop will stop actor immediately regardless of existing user messages in mailbox.
func (*PID) StopFuture ¶
StopFuture will stop actor immediately regardless of existing user messages in mailbox, and return its future.
func (*PID) Tell ¶
func (pid *PID) Tell(message interface{})
Tell sends a messages asynchronously to the PID
func (*PID) XXX_DiscardUnknown ¶
func (m *PID) XXX_DiscardUnknown()
func (*PID) XXX_Unmarshal ¶
type PIDSet ¶
type PIDSet struct {
// contains filtered or unexported fields
}
type PoisonPill ¶
type PoisonPill struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
user messages
func (*PoisonPill) AutoReceiveMessage ¶
func (*PoisonPill) AutoReceiveMessage()
func (*PoisonPill) Descriptor ¶
func (*PoisonPill) Descriptor() ([]byte, []int)
func (*PoisonPill) Equal ¶
func (this *PoisonPill) Equal(that interface{}) bool
func (*PoisonPill) Marshal ¶
func (m *PoisonPill) Marshal() (dAtA []byte, err error)
func (*PoisonPill) ProtoMessage ¶
func (*PoisonPill) ProtoMessage()
func (*PoisonPill) Reset ¶
func (m *PoisonPill) Reset()
func (*PoisonPill) Size ¶
func (m *PoisonPill) Size() (n int)
func (*PoisonPill) String ¶
func (this *PoisonPill) String() string
func (*PoisonPill) Unmarshal ¶
func (m *PoisonPill) Unmarshal(dAtA []byte) error
func (*PoisonPill) XXX_DiscardUnknown ¶
func (m *PoisonPill) XXX_DiscardUnknown()
func (*PoisonPill) XXX_Marshal ¶
func (m *PoisonPill) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*PoisonPill) XXX_Merge ¶
func (dst *PoisonPill) XXX_Merge(src proto.Message)
func (*PoisonPill) XXX_Size ¶
func (m *PoisonPill) XXX_Size() int
func (*PoisonPill) XXX_Unmarshal ¶
func (m *PoisonPill) XXX_Unmarshal(b []byte) error
type Process ¶
type Process interface { SendUserMessage(pid *PID, message interface{}) SendSystemMessage(pid *PID, message interface{}) Stop(pid *PID) MsgNum() int32 }
A Process is an interface that defines the base contract for interaction of actors
type ProcessRegistryValue ¶
type ProcessRegistryValue struct { SequenceID uint64 Address string LocalPIDs cmap.ConcurrentMap RemoteHandlers []AddressResolver }
func (*ProcessRegistryValue) Add ¶
func (pr *ProcessRegistryValue) Add(process Process, id string) (*PID, bool)
func (*ProcessRegistryValue) GetLocal ¶
func (pr *ProcessRegistryValue) GetLocal(id string) (Process, bool)
func (*ProcessRegistryValue) NextId ¶
func (pr *ProcessRegistryValue) NextId() string
func (*ProcessRegistryValue) RegisterAddressResolver ¶
func (pr *ProcessRegistryValue) RegisterAddressResolver(handler AddressResolver)
func (*ProcessRegistryValue) Remove ¶
func (pr *ProcessRegistryValue) Remove(pid *PID)
type Props ¶
type Props struct {
// contains filtered or unexported fields
}
Props represents configuration to define how an actor should be created
func FromInstance
deprecated
func FromProducer ¶
FromProducer creates a props with the given actor producer assigned
func FromSpawnFunc ¶
func (*Props) WithDispatcher ¶
func (props *Props) WithDispatcher(dispatcher mailbox.Dispatcher) *Props
WithDispatcher assigns a dispatcher to the props
func (*Props) WithGuardian ¶
func (props *Props) WithGuardian(guardian SupervisorStrategy) *Props
WithGuardian assigns a guardian strategy to the props
func (*Props) WithInstance
deprecated
func (*Props) WithMailbox ¶
WithMailbox assigns the desired mailbox producer to the props
func (*Props) WithMiddleware ¶
func (props *Props) WithMiddleware(middleware ...InboundMiddleware) *Props
Assign one or more middlewares to the props
func (*Props) WithOutboundMiddleware ¶
func (props *Props) WithOutboundMiddleware(middleware ...OutboundMiddleware) *Props
func (*Props) WithProducer ¶
WithProducer assigns a actor producer to the props
func (*Props) WithSpawnFunc ¶
WithSpawnFunc assigns a custom spawn func to the props, this is mainly for internal usage
func (*Props) WithSupervisor ¶
func (props *Props) WithSupervisor(supervisor SupervisorStrategy) *Props
WithSupervisor assigns a supervision strategy to the props
type ReadonlyMessageHeader ¶
type ReceiveTimeout ¶
type ReceiveTimeout struct{}
A ReceiveTimeout message is sent to an actor after the Context.ReceiveTimeout duration has expired
type Restart ¶
type Restart struct{}
Restart is message sent by the actor system to control the lifecycle of an actor
func (*Restart) SystemMessage ¶
func (*Restart) SystemMessage()
type RestartStatistics ¶
type RestartStatistics struct {
// contains filtered or unexported fields
}
RestartStatistics keeps track of how many times an actor have restarted and when
func NewRestartStatistics ¶
func NewRestartStatistics() *RestartStatistics
NewRestartStatistics construct a RestartStatistics
func (*RestartStatistics) Fail ¶
func (rs *RestartStatistics) Fail()
Fail increases the associated actors failure count
func (*RestartStatistics) FailureCount ¶
func (rs *RestartStatistics) FailureCount() int
FailureCount returns failure count
func (*RestartStatistics) NumberOfFailures ¶
func (rs *RestartStatistics) NumberOfFailures(withinDuration time.Duration) int
NumberOfFailures returns number of failures within a given duration
func (*RestartStatistics) Reset ¶
func (rs *RestartStatistics) Reset()
Reset the associated actors failure count
type Restarting ¶
type Restarting struct{}
A Restarting message is sent to an actor when the actor is being restarted by the system due to a failure
func (*Restarting) AutoReceiveMessage ¶
func (*Restarting) AutoReceiveMessage()
type SenderFunc ¶
type SenderFunc func(c Context, target *PID, envelope *MessageEnvelope)
type SpawnFunc ¶
var DefaultSpawner SpawnFunc = spawn
DefaultSpawner conforms to Spawner and is used to spawn a local actor
type Started ¶
type Started struct{}
A Started message is sent to an actor once it has been started and ready to begin receiving messages.
func (*Started) SystemMessage ¶
func (*Started) SystemMessage()
type Stop ¶
type Stop struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Stop) Descriptor ¶
func (*Stop) ProtoMessage ¶
func (*Stop) ProtoMessage()
func (*Stop) SystemMessage ¶
func (*Stop) SystemMessage()
func (*Stop) XXX_DiscardUnknown ¶
func (m *Stop) XXX_DiscardUnknown()
func (*Stop) XXX_Unmarshal ¶
type Stopped ¶
type Stopped struct{}
A Stopped message is sent to the actor once it has been stopped. A stopped actor will receive no further messages
func (*Stopped) AutoReceiveMessage ¶
func (*Stopped) AutoReceiveMessage()
type Stopping ¶
type Stopping struct{}
A Stopping message is sent to an actor prior to the actor being stopped
func (*Stopping) AutoReceiveMessage ¶
func (*Stopping) AutoReceiveMessage()
type Supervisor ¶
type Supervisor interface { Children() []*PID EscalateFailure(reason interface{}, message interface{}) RestartChildren(pids ...*PID) StopChildren(pids ...*PID) ResumeChildren(pids ...*PID) }
Supervisor is an interface that is used by the SupervisorStrategy to manage child actor lifecycle
type SupervisorEvent ¶
SupervisorEvent is sent on the EventStream when a supervisor have applied a directive to a failing child actor
type SupervisorStrategy ¶
type SupervisorStrategy interface {
HandleFailure(supervisor Supervisor, child *PID, rs *RestartStatistics, reason interface{}, message interface{})
}
SupervisorStrategy is an interface that decides how to handle failing child actors
func DefaultSupervisorStrategy ¶
func DefaultSupervisorStrategy() SupervisorStrategy
func NewAllForOneStrategy ¶
func NewAllForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
NewAllForOneStrategy returns a new SupervisorStrategy which applies the given fault Directive from the decider to the failing child and all its children.
This strategy is appropriate when the children have a strong dependency, such that and any single one failing would place them all into a potentially invalid state.
func NewExponentialBackoffStrategy ¶
func NewExponentialBackoffStrategy(backoffWindow time.Duration, initialBackoff time.Duration) SupervisorStrategy
NewExponentialBackoffStrategy creates a new Supervisor strategy that restarts a faulting child using an exponential back off algorithm:
delay =
func NewOneForOneStrategy ¶
func NewOneForOneStrategy(maxNrOfRetries int, withinDuration time.Duration, decider DeciderFunc) SupervisorStrategy
NewOneForOneStrategy returns a new Supervisor strategy which applies the fault Directive from the decider to the failing child process.
This strategy is applicable if it is safe to handle a single child in isolation from its peers or dependents
func NewRestartingStrategy ¶
func NewRestartingStrategy() SupervisorStrategy
func RestartingSupervisorStrategy ¶
func RestartingSupervisorStrategy() SupervisorStrategy
type SystemMessage ¶
type SystemMessage interface {
SystemMessage()
}
A SystemMessage message is reserved for specific lifecycle messages used by the actor system
type Terminated ¶
type Terminated struct { Who *PID `protobuf:"bytes,1,opt,name=who" json:"who,omitempty"` AddressTerminated bool `protobuf:"varint,2,opt,name=address_terminated,json=addressTerminated,proto3" json:"address_terminated,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Terminated) Descriptor ¶
func (*Terminated) Descriptor() ([]byte, []int)
func (*Terminated) Equal ¶
func (this *Terminated) Equal(that interface{}) bool
func (*Terminated) GetAddressTerminated ¶
func (m *Terminated) GetAddressTerminated() bool
func (*Terminated) GetWho ¶
func (m *Terminated) GetWho() *PID
func (*Terminated) Marshal ¶
func (m *Terminated) Marshal() (dAtA []byte, err error)
func (*Terminated) ProtoMessage ¶
func (*Terminated) ProtoMessage()
func (*Terminated) Reset ¶
func (m *Terminated) Reset()
func (*Terminated) Size ¶
func (m *Terminated) Size() (n int)
func (*Terminated) String ¶
func (this *Terminated) String() string
func (*Terminated) SystemMessage ¶
func (*Terminated) SystemMessage()
func (*Terminated) Unmarshal ¶
func (m *Terminated) Unmarshal(dAtA []byte) error
func (*Terminated) XXX_DiscardUnknown ¶
func (m *Terminated) XXX_DiscardUnknown()
func (*Terminated) XXX_Marshal ¶
func (m *Terminated) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*Terminated) XXX_Merge ¶
func (dst *Terminated) XXX_Merge(src proto.Message)
func (*Terminated) XXX_Size ¶
func (m *Terminated) XXX_Size() int
func (*Terminated) XXX_Unmarshal ¶
func (m *Terminated) XXX_Unmarshal(b []byte) error
type Unwatch ¶
type Unwatch struct { Watcher *PID `protobuf:"bytes,1,opt,name=watcher" json:"watcher,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Unwatch) Descriptor ¶
func (*Unwatch) GetWatcher ¶
func (*Unwatch) ProtoMessage ¶
func (*Unwatch) ProtoMessage()
func (*Unwatch) SystemMessage ¶
func (*Unwatch) SystemMessage()
func (*Unwatch) XXX_DiscardUnknown ¶
func (m *Unwatch) XXX_DiscardUnknown()
func (*Unwatch) XXX_Marshal ¶
func (*Unwatch) XXX_Unmarshal ¶
type Watch ¶
type Watch struct { Watcher *PID `protobuf:"bytes,1,opt,name=watcher" json:"watcher,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_sizecache int32 `json:"-"` }
system messages
func (*Watch) Descriptor ¶
func (*Watch) GetWatcher ¶
func (*Watch) ProtoMessage ¶
func (*Watch) ProtoMessage()
func (*Watch) SystemMessage ¶
func (*Watch) SystemMessage()
func (*Watch) XXX_DiscardUnknown ¶
func (m *Watch) XXX_DiscardUnknown()
func (*Watch) XXX_Marshal ¶
func (*Watch) XXX_Unmarshal ¶
Source Files ¶
- actor.go
- behaviorstack.go
- child_restart_stats.go
- context.go
- deadletter.go
- directive.go
- directive_string.go
- doc.go
- future.go
- guardian.go
- local_context.go
- local_process.go
- log.go
- mailbox.go
- message_envelope.go
- message_producer.go
- messages.go
- middleware_chain.go
- options.go
- pid.go
- pidset.go
- process.go
- process_registry.go
- props.go
- protos.pb.go
- root_supervisor.go
- spawn.go
- strategy_all_for_one.go
- strategy_exponential_backoff.go
- strategy_one_for_one.go
- strategy_restarting.go
- supervision.go
- supervision_event.go