Documentation ¶
Index ¶
- Constants
- Variables
- type Application
- type ApplicationBehavior
- type ApplicationChildSpec
- type ApplicationInfo
- type ApplicationSpec
- type ApplicationStartType
- type CancelFunc
- type Core
- type DirectStatus
- type EnvKey
- type Event
- type EventMessage
- type MessageDirectChildren
- type MessageDown
- type MessageEventDown
- type MessageExit
- type MessageFallback
- type MessageNodeDown
- type MessageProxyDown
- type MessageSagaCancel
- type MessageSagaError
- type Pool
- func (p *Pool) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (p *Pool) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (p *Pool) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (p *Pool) Init(process *ServerProcess, args ...etf.Term) error
- type PoolBehavior
- type PoolOptions
- type PoolProcess
- type PoolWorker
- func (pw *PoolWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (pw *PoolWorker) HandleWorkerCall(process *PoolWorkerProcess, message etf.Term) etf.Term
- func (pw *PoolWorker) HandleWorkerCast(process *PoolWorkerProcess, message etf.Term)
- func (pw *PoolWorker) HandleWorkerInfo(process *PoolWorkerProcess, message etf.Term)
- func (pw *PoolWorker) Init(process *ServerProcess, args ...etf.Term) error
- type PoolWorkerBehavior
- type PoolWorkerProcess
- type Process
- type ProcessBehavior
- type ProcessChannels
- type ProcessDirectMessage
- type ProcessFallback
- type ProcessGracefulExitRequest
- type ProcessID
- type ProcessInfo
- type ProcessMailboxMessage
- type ProcessOptions
- type ProcessState
- type Raft
- func (r *Raft) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (r *Raft) HandleCancel(process *RaftProcess, ref etf.Ref, reason string) RaftStatus
- func (r *Raft) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (r *Raft) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (r *Raft) HandleLeader(process *RaftProcess, leader *RaftLeader) RaftStatus
- func (r *Raft) HandlePeer(process *RaftProcess, peer etf.Pid, serial uint64) RaftStatus
- func (r *Raft) HandleQuorum(process *RaftProcess, quorum *RaftQuorum) RaftStatus
- func (r *Raft) HandleRaftCall(process *RaftProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (r *Raft) HandleRaftCast(process *RaftProcess, message etf.Term) ServerStatus
- func (r *Raft) HandleRaftDirect(process *RaftProcess, message interface{}) (interface{}, error)
- func (r *Raft) HandleRaftInfo(process *RaftProcess, message etf.Term) ServerStatus
- func (r *Raft) HandleSerial(process *RaftProcess, ref etf.Ref, serial uint64, key string, value etf.Term) RaftStatus
- func (r *Raft) Init(process *ServerProcess, args ...etf.Term) error
- type RaftBehavior
- type RaftLeader
- type RaftOptions
- type RaftProcess
- func (rp *RaftProcess) Append(key string, value etf.Term) (etf.Ref, error)
- func (rp *RaftProcess) AppendWithTimeout(key string, value etf.Term, timeout int) (etf.Ref, error)
- func (rp *RaftProcess) Get(serial uint64) (etf.Ref, error)
- func (rp *RaftProcess) GetWithTimeout(serial uint64, timeout int) (etf.Ref, error)
- func (rp *RaftProcess) Join(peer interface{}) error
- func (rp *RaftProcess) Leader() *RaftLeader
- func (rp *RaftProcess) Peers() []etf.Pid
- func (rp *RaftProcess) Quorum() *RaftQuorum
- func (rp *RaftProcess) Serial() uint64
- type RaftQuorum
- type RaftQuorumState
- type RaftStatus
- type RegisteredBehavior
- type RemoteSpawnOptions
- type RemoteSpawnRequest
- type Saga
- func (gs *Saga) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gs *Saga) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (gs *Saga) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus
- func (gs *Saga) HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, ...) SagaStatus
- func (gs *Saga) HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus
- func (gs *Saga) HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gs *Saga) HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleSagaDirect(process *SagaProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (gs *Saga) HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus
- func (gs *Saga) HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus
- func (gs *Saga) HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)
- func (gs *Saga) HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, ...) SagaStatus
- func (gs *Saga) Init(process *ServerProcess, args ...etf.Term) error
- func (gs *Saga) SetMaxTransactions(process Process, max uint) error
- type SagaBehavior
- type SagaJob
- type SagaJobID
- type SagaJobOptions
- type SagaNext
- type SagaNextID
- type SagaOptions
- type SagaProcess
- func (sp *SagaProcess) CancelJob(id SagaTransactionID, job SagaJobID, reason string) error
- func (sp *SagaProcess) CancelTransaction(id SagaTransactionID, reason string) error
- func (sp *SagaProcess) Next(id SagaTransactionID, next SagaNext) (SagaNextID, error)
- func (sp *SagaProcess) SendInterim(id SagaTransactionID, interim interface{}) error
- func (sp *SagaProcess) SendResult(id SagaTransactionID, result interface{}) error
- func (sp *SagaProcess) StartJob(id SagaTransactionID, options SagaJobOptions, value interface{}) (SagaJobID, error)
- func (sp *SagaProcess) StartTransaction(options SagaTransactionOptions, value interface{}) SagaTransactionID
- type SagaStatus
- type SagaTransaction
- type SagaTransactionID
- type SagaTransactionOptions
- type SagaWorker
- func (w *SagaWorker) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (w *SagaWorker) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (w *SagaWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleJobCommit(process *SagaWorkerProcess, final interface{})
- func (w *SagaWorker) HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (w *SagaWorker) HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleWorkerDirect(process *SagaWorkerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (w *SagaWorker) HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus
- func (w *SagaWorker) HandleWorkerTerminate(process *SagaWorkerProcess, reason string)
- func (w *SagaWorker) Init(process *ServerProcess, args ...etf.Term) error
- func (w *SagaWorker) Terminate(process *ServerProcess, reason string)
- type SagaWorkerBehavior
- type SagaWorkerProcess
- type Server
- func (gs *Server) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gs *Server) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Server) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (gs *Server) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (gs *Server) Init(process *ServerProcess, args ...etf.Term) error
- func (gs *Server) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)
- func (gs *Server) ProcessLoop(ps ProcessState, started chan<- bool) string
- func (gs *Server) Terminate(process *ServerProcess, reason string)
- type ServerBehavior
- type ServerFrom
- type ServerProcess
- func (sp *ServerProcess) Call(to interface{}, message etf.Term) (etf.Term, error)
- func (sp *ServerProcess) CallWithTimeout(to interface{}, message etf.Term, timeout int) (etf.Term, error)
- func (sp *ServerProcess) Cast(to interface{}, message etf.Term) error
- func (sp *ServerProcess) CastAfter(to interface{}, message etf.Term, after time.Duration) CancelFunc
- func (sp *ServerProcess) MessageCounter() uint64
- func (sp *ServerProcess) Reply(ref etf.Ref, reply etf.Term, err error) error
- func (sp *ServerProcess) SendReply(from ServerFrom, reply etf.Term) error
- type ServerStatus
- type Stage
- func (gst *Stage) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gst *Stage) HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus
- func (gst *Stage) HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus
- func (gst *Stage) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)
- func (gst *Stage) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (gst *Stage) HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus
- func (gst *Stage) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (gst *Stage) HandleStageCast(process *StageProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleStageDirect(process *StageProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (gst *Stage) HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus
- func (gst *Stage) HandleStageTerminate(process *StageProcess, reason string)
- func (gst *Stage) HandleSubscribe(process *StageProcess, subscription StageSubscription, ...) StageStatus
- func (gst *Stage) HandleSubscribed(process *StageProcess, subscription StageSubscription, ...) (bool, StageStatus)
- func (gst *Stage) Init(process *ServerProcess, args ...etf.Term) error
- func (gst *Stage) InitStage(process *StageProcess, args ...etf.Term) error
- func (s *Stage) SetCancelMode(p Process, subscription StageSubscription, cancel StageCancelMode) error
- func (gst *Stage) Terminate(process *ServerProcess, reason string)
- type StageBehavior
- type StageCancelMode
- type StageCancelReason
- type StageDispatchItem
- type StageDispatcher
- type StageDispatcherBehavior
- type StageOptions
- type StageProcess
- func (p *StageProcess) Ask(subscription StageSubscription, count uint) error
- func (p *StageProcess) AutoDemand(subscription StageSubscription) (bool, error)
- func (p *StageProcess) Cancel(subscription StageSubscription, reason string) error
- func (p *StageProcess) CancelMode(subscription StageSubscription) (StageCancelMode, error)
- func (p *StageProcess) DemandHandle() bool
- func (p *StageProcess) SendEvents(events etf.List) error
- func (p *StageProcess) SetAutoDemand(subscription StageSubscription, autodemand bool) error
- func (p *StageProcess) SetCancelMode(subscription StageSubscription, mode StageCancelMode) error
- func (p *StageProcess) SetDemandHandle(enable bool)
- func (p *StageProcess) Subscribe(producer etf.Term, opts StageSubscribeOptions) (StageSubscription, error)
- type StageStatus
- type StageSubscribeOptions
- type StageSubscription
- type Supervisor
- type SupervisorBehavior
- type SupervisorChildSpec
- type SupervisorSpec
- type SupervisorStrategy
- type SupervisorStrategyRestart
- type SupervisorStrategyType
- type TCP
- func (tcp *TCP) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (tcp *TCP) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (tcp *TCP) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (tcp *TCP) HandleTCPCall(process *TCPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (tcp *TCP) HandleTCPCast(process *TCPProcess, message etf.Term) ServerStatus
- func (tcp *TCP) HandleTCPInfo(process *TCPProcess, message etf.Term) ServerStatus
- func (tcp *TCP) HandleTCPTerminate(process *TCPProcess, reason string)
- func (tcp *TCP) Init(process *ServerProcess, args ...etf.Term) error
- func (tcp *TCP) Terminate(process *ServerProcess, reason string)
- type TCPBehavior
- type TCPConnection
- type TCPHandler
- func (tcph *TCPHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (tcph *TCPHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (tcph *TCPHandler) HandleConnect(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus
- func (tcph *TCPHandler) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (tcph *TCPHandler) HandleDisconnect(process *TCPHandlerProcess, conn *TCPConnection)
- func (tcph *TCPHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (tcph *TCPHandler) HandleTCPHandlerCall(process *TCPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (tcph *TCPHandler) HandleTCPHandlerCast(process *TCPHandlerProcess, message etf.Term) ServerStatus
- func (tcph *TCPHandler) HandleTCPHandlerInfo(process *TCPHandlerProcess, message etf.Term) ServerStatus
- func (tcph *TCPHandler) HandleTCPHandlerTerminate(process *TCPHandlerProcess, reason string)
- func (tcph *TCPHandler) HandleTimeout(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus
- func (tcph *TCPHandler) Init(process *ServerProcess, args ...etf.Term) error
- func (tcph *TCPHandler) Terminate(process *ServerProcess, reason string)
- type TCPHandlerBehavior
- type TCPHandlerProcess
- type TCPHandlerStatus
- type TCPOptions
- type TCPProcess
- type TCPStatus
- type UDP
- func (udp *UDP) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (udp *UDP) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (udp *UDP) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (udp *UDP) HandleUDPCall(process *UDPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (udp *UDP) HandleUDPCast(process *UDPProcess, message etf.Term) ServerStatus
- func (udp *UDP) HandleUDPInfo(process *UDPProcess, message etf.Term) ServerStatus
- func (udp *UDP) HandleUDPTerminate(process *UDPProcess, reason string)
- func (udp *UDP) Init(process *ServerProcess, args ...etf.Term) error
- func (udp *UDP) Terminate(process *ServerProcess, reason string)
- type UDPBehavior
- type UDPHandler
- func (udph *UDPHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (udph *UDPHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (udph *UDPHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (udph *UDPHandler) HandleTimeout(process *UDPHandlerProcess)
- func (udph *UDPHandler) HandleUDPHandlerCall(process *UDPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (udph *UDPHandler) HandleUDPHandlerCast(process *UDPHandlerProcess, message etf.Term) ServerStatus
- func (udph *UDPHandler) HandleUDPHandlerInfo(process *UDPHandlerProcess, message etf.Term) ServerStatus
- func (udph *UDPHandler) HandleUDPHandlerTerminate(process *UDPHandlerProcess, reason string)
- func (udph *UDPHandler) Init(process *ServerProcess, args ...etf.Term) error
- func (udph *UDPHandler) Terminate(process *ServerProcess, reason string)
- type UDPHandlerBehavior
- type UDPHandlerProcess
- type UDPOptions
- type UDPPacket
- type UDPProcess
- type UDPStatus
- type Web
- func (web *Web) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (web *Web) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (web *Web) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (web *Web) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (web *Web) HandleWebCall(process *WebProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (web *Web) HandleWebCast(process *WebProcess, message etf.Term) ServerStatus
- func (web *Web) HandleWebInfo(process *WebProcess, message etf.Term) ServerStatus
- func (web *Web) Init(process *ServerProcess, args ...etf.Term) error
- func (web *Web) Terminate(process *ServerProcess, reason string)
- type WebBehavior
- type WebHandler
- func (wh *WebHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (wh *WebHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
- func (wh *WebHandler) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
- func (wh *WebHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
- func (wh *WebHandler) HandleWebHandlerCall(process *WebHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
- func (wh *WebHandler) HandleWebHandlerCast(process *WebHandlerProcess, message etf.Term) ServerStatus
- func (wh *WebHandler) HandleWebHandlerInfo(process *WebHandlerProcess, message etf.Term) ServerStatus
- func (wh *WebHandler) HandleWebHandlerTerminate(process *WebHandlerProcess, reason string, count int64)
- func (wh *WebHandler) Init(process *ServerProcess, args ...etf.Term) error
- func (wh *WebHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (wh *WebHandler) Terminate(process *ServerProcess, reason string)
- type WebHandlerBehavior
- type WebHandlerOptions
- type WebHandlerProcess
- type WebHandlerStatus
- type WebMessageRequest
- type WebOptions
- type WebProcess
- type WebStatus
Constants ¶
const ( // ApplicationStartPermanent If a permanent application terminates, // all other applications and the runtime system (node) are also terminated. ApplicationStartPermanent ApplicationStartType = "permanent" // ApplicationStartTemporary If a temporary application terminates, // this is reported but no other applications are terminated. ApplicationStartTemporary ApplicationStartType = "temporary" // ApplicationStartTransient If a transient application terminates // with reason normal, this is reported but no other applications are // terminated. If a transient application terminates abnormally, that // is with any other reason than normal, all other applications and // the runtime system (node) are also terminated. ApplicationStartTransient ApplicationStartType = "transient" // EnvKeyAppSpec EnvKeyAppSpec EnvKey = "ergo:AppSpec" )
const ( DefaultRaftGetTimeout = 5 // in seconds DefaultRaftAppendTimeout = 5 // in seconds DefaultRaftHeartbeat = 3 // in seconds )
const ( // SupervisorRestartIntensity SupervisorRestartIntensity = uint16(10) // SupervisorRestartPeriod SupervisorRestartPeriod = uint16(10) // SupervisorStrategyOneForOne If one child process terminates and is to be restarted, only // that child process is affected. This is the default restart strategy. SupervisorStrategyOneForOne = SupervisorStrategyType("one_for_one") // SupervisorStrategyOneForAll If one child process terminates and is to be restarted, all other // child processes are terminated and then all child processes are restarted. SupervisorStrategyOneForAll = SupervisorStrategyType("one_for_all") // SupervisorStrategyRestForOne If one child process terminates and is to be restarted, // the 'rest' of the child processes (that is, the child // processes after the terminated child process in the start order) // are terminated. Then the terminated child process and all // child processes after it are restarted SupervisorStrategyRestForOne = SupervisorStrategyType("rest_for_one") // SupervisorStrategySimpleOneForOne A simplified one_for_one supervisor, where all // child processes are dynamically added instances // of the same process type, that is, running the same code. SupervisorStrategySimpleOneForOne = SupervisorStrategyType("simple_one_for_one") // SupervisorStrategyRestartPermanent child process is always restarted SupervisorStrategyRestartPermanent = SupervisorStrategyRestart("permanent") // SupervisorStrategyRestartTemporary child process is never restarted // (not even when the supervisor restart strategy is rest_for_one // or one_for_all and a sibling death causes the temporary process // to be terminated) SupervisorStrategyRestartTemporary = SupervisorStrategyRestart("temporary") // SupervisorStrategyRestartTransient child process is restarted only if // it terminates abnormally, that is, with an exit reason other // than normal, shutdown. SupervisorStrategyRestartTransient = SupervisorStrategyRestart("transient") )
const (
DefaultCallTimeout = 5
)
Variables ¶
var ( ErrRaftState = fmt.Errorf("incorrect raft state") ErrRaftNoQuorum = fmt.Errorf("no quorum") ErrRaftNoLeader = fmt.Errorf("no leader") ErrRaftNoSerial = fmt.Errorf("no peers with requested serial") ErrRaftBusy = fmt.Errorf("another append request is in progress") ErrRaftWrongTimeout = fmt.Errorf("wrong timeout value") )
var ( RaftStatusOK RaftStatus // nil RaftStatusStop RaftStatus = fmt.Errorf("stop") RaftStatusDiscard RaftStatus = fmt.Errorf("discard") RaftQuorumState3 RaftQuorumState = 3 // minimum quorum that could make leader election RaftQuorumState5 RaftQuorumState = 5 RaftQuorumState7 RaftQuorumState = 7 RaftQuorumState9 RaftQuorumState = 9 RaftQuorumState11 RaftQuorumState = 11 // maximal quorum )
var ( SagaStatusOK SagaStatus // nil SagaStatusStop SagaStatus = fmt.Errorf("stop") ErrSagaTxEndOfLifespan = fmt.Errorf("End of TX lifespan") ErrSagaTxNextTimeout = fmt.Errorf("Next saga timeout") ErrSagaUnknown = fmt.Errorf("Unknown saga") ErrSagaJobUnknown = fmt.Errorf("Unknown job") ErrSagaTxUnknown = fmt.Errorf("Unknown TX") ErrSagaTxCanceled = fmt.Errorf("Tx is canceled") ErrSagaTxInProgress = fmt.Errorf("Tx is still in progress") ErrSagaResultAlreadySent = fmt.Errorf("Result is already sent") ErrSagaNotAllowed = fmt.Errorf("Operation is not allowed") )
var ( ServerStatusOK ServerStatus = nil ServerStatusStop ServerStatus = fmt.Errorf("stop") ServerStatusIgnore ServerStatus = fmt.Errorf("ignore") DirectStatusOK DirectStatus = nil DirectStatusIgnore DirectStatus = fmt.Errorf("ignore") )
Functions ¶
This section is empty.
Types ¶
type Application ¶
type Application struct{}
Application is implementation of ProcessBehavior interface
func (*Application) ProcessInit ¶
func (a *Application) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)
ProcessInit
func (*Application) ProcessLoop ¶
func (a *Application) ProcessLoop(ps ProcessState, started chan<- bool) string
ProcessLoop
type ApplicationBehavior ¶
type ApplicationBehavior interface { ProcessBehavior Load(args ...etf.Term) (ApplicationSpec, error) Start(process Process, args ...etf.Term) }
ApplicationBehavior interface
type ApplicationChildSpec ¶
type ApplicationChildSpec struct { Child ProcessBehavior Options ProcessOptions Name string Args []etf.Term // contains filtered or unexported fields }
ApplicationChildSpec
type ApplicationInfo ¶
ApplicationInfo
type ApplicationSpec ¶
type ApplicationSpec struct { sync.Mutex Name string Description string Version string Lifespan time.Duration Applications []string Env map[EnvKey]interface{} Children []ApplicationChildSpec Process Process StartType ApplicationStartType }
ApplicationSpec
type ApplicationStartType ¶
type ApplicationStartType = string
type CancelFunc ¶
type CancelFunc func() bool
type Core ¶
type Core interface { // ProcessByName returns Process for the given name. // Returns nil if it doesn't exist (not found) or terminated. ProcessByName(name string) Process // ProcessByPid returns Process for the given Pid. // Returns nil if it doesn't exist (not found) or terminated. ProcessByPid(pid etf.Pid) Process // ProcessByAlias returns Process for the given alias. // Returns nil if it doesn't exist (not found) or terminated ProcessByAlias(alias etf.Alias) Process // ProcessInfo returns the details about given Pid ProcessInfo(pid etf.Pid) (ProcessInfo, error) // ProcessList returns the list of running processes ProcessList() []Process // MakeRef creates an unique reference within this node MakeRef() etf.Ref // IsAlias checks whether the given alias is belongs to the alive process on this node. // If the process died all aliases are cleaned up and this function returns // false for the given alias. For alias from the remote node always returns false. IsAlias(etf.Alias) bool // IsMonitor returns true if the given references is a monitor IsMonitor(ref etf.Ref) bool // RegisterBehavior RegisterBehavior(group, name string, behavior ProcessBehavior, data interface{}) error // RegisteredBehavior RegisteredBehavior(group, name string) (RegisteredBehavior, error) // RegisteredBehaviorGroup RegisteredBehaviorGroup(group string) []RegisteredBehavior // UnregisterBehavior UnregisterBehavior(group, name string) error }
Core the common set of methods provided by Process and node.Node interfaces
type DirectStatus ¶
type DirectStatus error
type EventMessage ¶
type EventMessage interface{}
type MessageDirectChildren ¶
type MessageDirectChildren struct{}
MessageDirectChildren type intended to be used in Process.Children which returns []etf.Pid You can handle this type of message in your HandleDirect callback to enable Process.Children support for your gen.Server actor.
type MessageDown ¶
type MessageDown struct { Ref etf.Ref // a monitor reference ProcessID ProcessID // if monitor was created by name Pid etf.Pid Reason string }
MessageDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorProcess. Reason values:
- the exit reason of the process
- 'noproc' (process did not exist at the time of monitor creation)
- 'noconnection' (no connection to the node where the monitored process resides)
- 'noproxy' (no connection to the proxy this node had has a connection through. monitored process could be still alive)
type MessageEventDown ¶
MessageEventDown delivers to the process which monitored EventType if the owner of this EventType has terminated
type MessageExit ¶
MessageExit delievers to Server's HandleInfo callback on enabled trap exit using SetTrapExit(true) Reason values:
- the exit reason of the process
- 'noproc' (process did not exist at the time of link creation)
- 'noconnection' (no connection to the node where the linked process resides)
- 'noproxy' (no connection to the proxy this node had has a connection through. linked process could be still alive)
type MessageFallback ¶
MessageFallback delivers to the process specified as a fallback process in ProcessOptions.Fallback.Name if the mailbox has been overflowed
func IsMessageFallback ¶
func IsMessageFallback(message etf.Term) (MessageFallback, bool)
IsMessageFallback
type MessageNodeDown ¶
MessageNodeDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorNode
type MessageProxyDown ¶
MessageProxyDown delivers as a message to Server's HandleInfo callback of the process that created monitor using MonitorNode if the connection to the node was through the proxy nodes and one of them went down.
func IsMessageProxyDown ¶
func IsMessageProxyDown(message etf.Term) (MessageProxyDown, bool)
IsMessageProxyDown
type MessageSagaCancel ¶
type MessageSagaCancel struct { TransactionID SagaTransactionID NextID SagaNextID Reason string }
MessageSagaCancel
type MessageSagaError ¶
type MessageSagaError struct { TransactionID SagaTransactionID NextID SagaNextID Error string Details string }
MessageSagaError
type Pool ¶ added in v1.999.222
type Pool struct {
Server
}
func (*Pool) HandleCall ¶ added in v1.999.222
func (p *Pool) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*Pool) HandleCast ¶ added in v1.999.222
func (p *Pool) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*Pool) HandleInfo ¶ added in v1.999.222
func (p *Pool) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
type PoolBehavior ¶ added in v1.999.222
type PoolBehavior interface { ServerBehavior InitPool(process *PoolProcess, args ...etf.Term) (PoolOptions, error) }
type PoolOptions ¶ added in v1.999.222
type PoolOptions struct { NumWorkers int Worker PoolWorkerBehavior WorkerOptions ProcessOptions WorkerArgs []etf.Term }
type PoolProcess ¶ added in v1.999.222
type PoolProcess struct { ServerProcess // contains filtered or unexported fields }
type PoolWorker ¶ added in v1.999.222
type PoolWorker struct {
Server
}
func (*PoolWorker) HandleInfo ¶ added in v1.999.222
func (pw *PoolWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*PoolWorker) HandleWorkerCall ¶ added in v1.999.222
func (pw *PoolWorker) HandleWorkerCall(process *PoolWorkerProcess, message etf.Term) etf.Term
HandleWorkerCall
func (*PoolWorker) HandleWorkerCast ¶ added in v1.999.222
func (pw *PoolWorker) HandleWorkerCast(process *PoolWorkerProcess, message etf.Term)
HandleWorkerCast
func (*PoolWorker) HandleWorkerInfo ¶ added in v1.999.222
func (pw *PoolWorker) HandleWorkerInfo(process *PoolWorkerProcess, message etf.Term)
HandleWorkerInfo
func (*PoolWorker) Init ¶ added in v1.999.222
func (pw *PoolWorker) Init(process *ServerProcess, args ...etf.Term) error
type PoolWorkerBehavior ¶ added in v1.999.222
type PoolWorkerBehavior interface { ServerBehavior InitPoolWorker(process *PoolWorkerProcess, args ...etf.Term) error HandleWorkerInfo(process *PoolWorkerProcess, message etf.Term) HandleWorkerCast(process *PoolWorkerProcess, message etf.Term) HandleWorkerCall(process *PoolWorkerProcess, message etf.Term) etf.Term }
type PoolWorkerProcess ¶ added in v1.999.222
type PoolWorkerProcess struct {
ServerProcess
}
type Process ¶
type Process interface { Core // Spawn create a new process with parent Spawn(name string, opts ProcessOptions, object ProcessBehavior, args ...etf.Term) (Process, error) // RemoteSpawn creates a new process at a remote node. The object name is a regitered // behavior on a remote name using RegisterBehavior(...). The given options will stored // in the process environment using node.EnvKeyRemoteSpawn as a key RemoteSpawn(node string, object string, opts RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error) RemoteSpawnWithTimeout(timeout int, node string, object string, opts RemoteSpawnOptions, args ...etf.Term) (etf.Pid, error) // Name returns process name used on starting. Name() string // RegisterName register associates the name with pid (not overrides registered name on starting) RegisterName(name string) error // UnregisterName unregister named process. Unregistering name is allowed to the owner only UnregisterName(name string) error // NodeName returns node name NodeName() string // NodeStop stops the node NodeStop() // NodeUptime returns node lifespan NodeUptime() int64 // Info returns process details Info() ProcessInfo // Self returns registered process identificator belongs to the process Self() etf.Pid // Direct make a direct request to the actor (gen.Application, gen.Supervisor, gen.Server or // inherited from gen.Server actor) with default timeout 5 seconds Direct(request interface{}) (interface{}, error) // DirectWithTimeout make a direct request to the actor with the given timeout (in seconds) DirectWithTimeout(request interface{}, timeout int) (interface{}, error) // Send sends a message in fashion of 'erlang:send'. The value of 'to' can be a Pid, registered local name // or gen.ProcessID{RegisteredName, NodeName} Send(to interface{}, message etf.Term) error // SendAfter starts a timer. When the timer expires, the message sends to the process // identified by 'to'. 'to' can be a Pid, registered local name or // gen.ProcessID{RegisteredName, NodeName}. Returns cancel function in order to discard // sending a message. CancelFunc returns bool value. If it returns false, than the timer has // already expired and the message has been sent. SendAfter(to interface{}, message etf.Term, after time.Duration) CancelFunc // Exit initiate a graceful stopping process Exit(reason string) error // Kill immediately stops process Kill() // CreateAlias creates a new alias for the Process CreateAlias() (etf.Alias, error) // DeleteAlias deletes the given alias DeleteAlias(alias etf.Alias) error // ListEnv returns a map of configured environment variables. // It also includes environment variables from the GroupLeader, Parent and Node. // which are overlapped by priority: Process(Parent(GroupLeader(Node))) ListEnv() map[EnvKey]interface{} // SetEnv set environment variable with given name. Use nil value to remove variable with given name. SetEnv(name EnvKey, value interface{}) // Env returns value associated with given environment name. Env(name EnvKey) interface{} // Wait waits until process stopped Wait() // WaitWithTimeout waits until process stopped. Return ErrTimeout // if given timeout is exceeded WaitWithTimeout(d time.Duration) error // Link creates a link between the calling process and another process. // Links are bidirectional and there can only be one link between two processes. // Repeated calls to Process.Link(Pid) have no effect. If one of the participants // of a link terminates, it will send an exit signal to the other participant and caused // termination of the last one. If process set a trap using Process.SetTrapExit(true) the exit signal transorms into the MessageExit and delivers as a regular message. Link(with etf.Pid) error // Unlink removes the link, if there is one, between the calling process and // the process referred to by Pid. Unlink(with etf.Pid) error // IsAlive returns whether the process is alive IsAlive() bool // SetTrapExit enables/disables the trap on terminate process. When a process is trapping exits, // it will not terminate when an exit signal is received. Instead, the signal is transformed // into a 'gen.MessageExit' which is put into the mailbox of the process just like a regular message. SetTrapExit(trap bool) // TrapExit returns whether the trap was enabled on this process TrapExit() bool // Compression returns true if compression is enabled for this process Compression() bool // SetCompression enables/disables compression for the messages sent outside of this node SetCompression(enabled bool) // CompressionLevel returns comression level for the process CompressionLevel() int // SetCompressionLevel defines compression level. Value must be in range: // 1 (best speed) ... 9 (best compression), or -1 for the default compression level SetCompressionLevel(level int) bool // CompressionThreshold returns compression threshold for the process CompressionThreshold() int // SetCompressionThreshold defines the minimal size for the message that must be compressed // Value must be greater than DefaultCompressionThreshold (1024) SetCompressionThreshold(threshold int) bool // MonitorNode creates monitor between the current process and node. If Node fails or does not exist, // the message MessageNodeDown is delivered to the process. MonitorNode(name string) etf.Ref // DemonitorNode removes monitor. Returns false if the given reference wasn't found DemonitorNode(ref etf.Ref) bool // MonitorProcess creates monitor between the processes. // Allowed types for the 'process' value: etf.Pid, gen.ProcessID // When a process monitor is triggered, a MessageDown sends to the caller. // Note: The monitor request is an asynchronous signal. That is, it takes // time before the signal reaches its destination. MonitorProcess(process interface{}) etf.Ref // DemonitorProcess removes monitor. Returns false if the given reference wasn't found DemonitorProcess(ref etf.Ref) bool // Behavior returns the object this process runs on. Behavior() ProcessBehavior // GroupLeader returns group leader process. Usually it points to the application process. GroupLeader() Process // Parent returns parent process. It returns nil if this process was spawned using Node.Spawn. Parent() Process // Context returns process context. Context() context.Context // Children returns list of children pid (Application, Supervisor) Children() ([]etf.Pid, error) // Links returns list of the process pids this process has linked to. Links() []etf.Pid // Monitors returns list of monitors created this process by pid. Monitors() []etf.Pid // Monitors returns list of monitors created this process by name. MonitorsByName() []ProcessID // MonitoredBy returns list of process pids monitored this process. MonitoredBy() []etf.Pid // Aliases returns list of aliases of this process. Aliases() []etf.Alias // RegisterEvent RegisterEvent(event Event, messages ...EventMessage) error UnregisterEvent(event Event) error MonitorEvent(event Event) error DemonitorEvent(event Event) error SendEventMessage(event Event, message EventMessage) error PutSyncRequest(ref etf.Ref) error CancelSyncRequest(ref etf.Ref) WaitSyncReply(ref etf.Ref, timeout int) (etf.Term, error) PutSyncReply(ref etf.Ref, term etf.Term, err error) error ProcessChannels() ProcessChannels }
Process
type ProcessBehavior ¶
type ProcessBehavior interface { ProcessInit(Process, ...etf.Term) (ProcessState, error) ProcessLoop(ProcessState, chan<- bool) string // method which implements control flow of process }
ProcessBehavior interface contains methods you should implement to make your own process behavior
type ProcessChannels ¶
type ProcessChannels struct { Mailbox <-chan ProcessMailboxMessage Direct <-chan ProcessDirectMessage GracefulExit <-chan ProcessGracefulExitRequest }
ProcessChannels
type ProcessDirectMessage ¶
ProcessDirectMessage
type ProcessGracefulExitRequest ¶
ProcessGracefulExitRequest
type ProcessInfo ¶
type ProcessInfo struct { PID etf.Pid Name string CurrentFunction string Status string MessageQueueLen int Links []etf.Pid Monitors []etf.Pid MonitorsByName []ProcessID MonitoredBy []etf.Pid Aliases []etf.Alias Dictionary etf.Map TrapExit bool GroupLeader etf.Pid Compression bool }
ProcessInfo struct with process details
type ProcessMailboxMessage ¶
ProcessMailboxMessage
type ProcessOptions ¶
type ProcessOptions struct { // Context allows mixing the system context with the custom one. E.g., to limit // the lifespan using context.WithTimeout. This context MUST be based on the // other Process' context. Otherwise, you get the error lib.ErrProcessContext Context context.Context // MailboxSize defines the length of message queue for the process MailboxSize uint16 // DirectboxSize defines the length of message queue for the direct requests DirectboxSize uint16 // GroupLeader GroupLeader Process // Env set the process environment variables Env map[EnvKey]interface{} // Fallback defines the process to where messages will be forwarded // if the mailbox is overflowed. The tag value could be used to // differentiate the source processes. Forwarded messages are wrapped // into the MessageFallback struct. Fallback ProcessFallback }
ProcessOptions
type Raft ¶
type Raft struct {
Server
}
func (*Raft) HandleCall ¶
func (r *Raft) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleCall
func (*Raft) HandleCancel ¶
func (r *Raft) HandleCancel(process *RaftProcess, ref etf.Ref, reason string) RaftStatus
HandleCancel
func (*Raft) HandleCast ¶
func (r *Raft) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
HandleCast
func (*Raft) HandleInfo ¶
func (r *Raft) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
HandleInfo
func (*Raft) HandleLeader ¶
func (r *Raft) HandleLeader(process *RaftProcess, leader *RaftLeader) RaftStatus
HandleLeader
func (*Raft) HandlePeer ¶
func (r *Raft) HandlePeer(process *RaftProcess, peer etf.Pid, serial uint64) RaftStatus
HandlePeer
func (*Raft) HandleQuorum ¶
func (r *Raft) HandleQuorum(process *RaftProcess, quorum *RaftQuorum) RaftStatus
HandleQuorum
func (*Raft) HandleRaftCall ¶
func (r *Raft) HandleRaftCall(process *RaftProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleRaftCall
func (*Raft) HandleRaftCast ¶
func (r *Raft) HandleRaftCast(process *RaftProcess, message etf.Term) ServerStatus
HandleRaftCast
func (*Raft) HandleRaftDirect ¶
func (r *Raft) HandleRaftDirect(process *RaftProcess, message interface{}) (interface{}, error)
HandleRaftDirect
func (*Raft) HandleRaftInfo ¶
func (r *Raft) HandleRaftInfo(process *RaftProcess, message etf.Term) ServerStatus
HandleRaftInfo
func (*Raft) HandleSerial ¶
func (r *Raft) HandleSerial(process *RaftProcess, ref etf.Ref, serial uint64, key string, value etf.Term) RaftStatus
HandleSerial
type RaftBehavior ¶
type RaftBehavior interface { ServerBehavior InitRaft(process *RaftProcess, arr ...etf.Term) (RaftOptions, error) // HandleAppend. Invokes on append request. To cancel this request by a leader, it must return RaftStatusDiscard. HandleAppend(process *RaftProcess, ref etf.Ref, serial uint64, key string, value etf.Term) RaftStatus // HandleGet HandleGet(process *RaftProcess, serial uint64) (string, etf.Term, RaftStatus) // HandlePeer HandlePeer(process *RaftProcess, peer etf.Pid, serial uint64) RaftStatus // HandleQuorum HandleQuorum(process *RaftProcess, quorum *RaftQuorum) RaftStatus // HandleLeader HandleLeader(process *RaftProcess, leader *RaftLeader) RaftStatus // HandleCancel HandleCancel(process *RaftProcess, ref etf.Ref, reason string) RaftStatus // HandleSerial HandleSerial(process *RaftProcess, ref etf.Ref, serial uint64, key string, value etf.Term) RaftStatus // HandleRaftCall this callback is invoked on ServerProcess.Call. This method is optional // for the implementation HandleRaftCall(process *RaftProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional // for the implementation HandleRaftCast(process *RaftProcess, message etf.Term) ServerStatus // HandleStageInfo this callback is invoked on Process.Send. This method is optional // for the implementation HandleRaftInfo(process *RaftProcess, message etf.Term) ServerStatus // HandleRaftDirect this callback is invoked on Process.Direct. This method is optional // for the implementation HandleRaftDirect(process *RaftProcess, message interface{}) (interface{}, error) }
type RaftLeader ¶
type RaftLeader struct { Leader etf.Pid Serial uint64 State RaftQuorumState }
type RaftOptions ¶
type RaftProcess ¶
type RaftProcess struct { ServerProcess // contains filtered or unexported fields }
func (*RaftProcess) AppendWithTimeout ¶
AppendWithTimeout
func (*RaftProcess) Get ¶
func (rp *RaftProcess) Get(serial uint64) (etf.Ref, error)
Get makes a request to the quorum member to get the data with the given serial number and sets the timeout to the DefaultRaftGetTimeout = 5 sec. It returns ErrRaftNoQuorum if quorum forming is still in progress.
func (*RaftProcess) GetWithTimeout ¶
Get makes a request to the quorum member to get the data with the given serial number and timeout in seconds. Returns a reference of this request. Once requested data has arrived the callback HandleSerial will be invoked. If a timeout occurred the callback HandleCancel will be invoked with reason "timeout"
func (*RaftProcess) Join ¶
func (rp *RaftProcess) Join(peer interface{}) error
Join makes a join requst to the given peer, which is supposed to be in a raft cluster
func (*RaftProcess) Leader ¶
func (rp *RaftProcess) Leader() *RaftLeader
Leader returns current leader in the quorum. It returns nil If this process is not a quorum or if leader election is still in progress
func (*RaftProcess) Peers ¶
func (rp *RaftProcess) Peers() []etf.Pid
Peers returns list of the processes in the raft cluster. Note, this list is sorted by the Serial value on them in the descending order
func (*RaftProcess) Quorum ¶
func (rp *RaftProcess) Quorum() *RaftQuorum
Quorum returns current quorum. It returns nil if quorum hasn't built yet.
func (*RaftProcess) Serial ¶
func (rp *RaftProcess) Serial() uint64
Serial returns current value of serial for this raft process
type RaftQuorum ¶
type RaftQuorum struct { Member bool State RaftQuorumState Peers []etf.Pid // the number of participants in quorum could be 3,5,7,9,11 }
type RaftQuorumState ¶
type RaftQuorumState int
type RaftStatus ¶
type RaftStatus error
type RegisteredBehavior ¶
type RegisteredBehavior struct { Behavior ProcessBehavior Data interface{} }
RegisteredBehavior
type RemoteSpawnOptions ¶
type RemoteSpawnOptions struct { // Name register associated name with spawned process Name string // Monitor enables monitor on the spawned process using provided reference Monitor etf.Ref // Link enables link between the calling and spawned processes Link bool // Function in order to support {M,F,A} request to the Erlang node Function string }
RemoteSpawnOptions defines options for RemoteSpawn method
type RemoteSpawnRequest ¶
type RemoteSpawnRequest struct { From etf.Pid Ref etf.Ref Options RemoteSpawnOptions }
RemoteSpawnRequest
type Saga ¶
type Saga struct {
Server
}
Saga
func (*Saga) HandleCall ¶
func (gs *Saga) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleCall
func (*Saga) HandleCast ¶
func (gs *Saga) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
HandleCast
func (*Saga) HandleDirect ¶
func (gs *Saga) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
HandleDirect
func (*Saga) HandleInfo ¶
func (gs *Saga) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
HandleInfo
func (*Saga) HandleJobFailed ¶
func (gs *Saga) HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus
HandleJobFailed
func (*Saga) HandleJobInterim ¶
func (gs *Saga) HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus
HandleJobInterim
func (*Saga) HandleJobResult ¶
func (gs *Saga) HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus
HandleJobResult
func (*Saga) HandleSagaCall ¶
func (gs *Saga) HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleSagaCall
func (*Saga) HandleSagaCast ¶
func (gs *Saga) HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus
HandleSagaCast
func (*Saga) HandleSagaDirect ¶
func (gs *Saga) HandleSagaDirect(process *SagaProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
HandleSagaDirect
func (*Saga) HandleSagaInfo ¶
func (gs *Saga) HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus
HandleSagaInfo
func (*Saga) HandleTxCommit ¶
func (gs *Saga) HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus
HandleTxCommit
func (*Saga) HandleTxDone ¶
func (gs *Saga) HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus)
HandleTxDone
func (*Saga) HandleTxInterim ¶
func (gs *Saga) HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus
HandleTxInterim
type SagaBehavior ¶
type SagaBehavior interface { ServerBehavior // InitSaga InitSaga(process *SagaProcess, args ...etf.Term) (SagaOptions, error) // HandleTxNew invokes on a new TX receiving by this saga. HandleTxNew(process *SagaProcess, id SagaTransactionID, value interface{}) SagaStatus // HandleTxResult invoked on a receiving result from the next saga HandleTxResult(process *SagaProcess, id SagaTransactionID, from SagaNextID, result interface{}) SagaStatus // HandleTxCancel invoked on a request of transaction cancelation. HandleTxCancel(process *SagaProcess, id SagaTransactionID, reason string) SagaStatus // HandleTxDone invoked when the transaction is done on a saga where it was created. // It returns the final result and SagaStatus. The commit message will deliver the final // result to all participants of this transaction (if it has enabled the TwoPhaseCommit option). // Otherwise the final result will be ignored. HandleTxDone(process *SagaProcess, id SagaTransactionID, result interface{}) (interface{}, SagaStatus) // HandleTxInterim invoked if received interim result from the next hop HandleTxInterim(process *SagaProcess, id SagaTransactionID, from SagaNextID, interim interface{}) SagaStatus // HandleTxCommit invoked if TwoPhaseCommit option is enabled for the given TX. // All sagas involved in this TX receive a commit message with final value and invoke this callback. // The final result has a value returned by HandleTxDone on a Saga created this TX. HandleTxCommit(process *SagaProcess, id SagaTransactionID, final interface{}) SagaStatus // HandleJobResult HandleJobResult(process *SagaProcess, id SagaTransactionID, from SagaJobID, result interface{}) SagaStatus // HandleJobInterim HandleJobInterim(process *SagaProcess, id SagaTransactionID, from SagaJobID, interim interface{}) SagaStatus // HandleJobFailed HandleJobFailed(process *SagaProcess, id SagaTransactionID, from SagaJobID, reason string) SagaStatus // HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional // for the implementation HandleSagaCall(process *SagaProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional // for the implementation HandleSagaCast(process *SagaProcess, message etf.Term) ServerStatus // HandleStageInfo this callback is invoked on Process.Send. This method is optional // for the implementation HandleSagaInfo(process *SagaProcess, message etf.Term) ServerStatus // HandleSagaDirect this callback is invoked on Process.Direct. This method is optional // for the implementation HandleSagaDirect(process *SagaProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus) }
SagaBehavior interface
type SagaJob ¶
type SagaJob struct { ID SagaJobID TransactionID SagaTransactionID Value interface{} // contains filtered or unexported fields }
SagaJob
type SagaNext ¶
type SagaNext struct { // Saga etf.Pid, string (for the locally registered process), gen.ProcessID{process, node} (for the remote process) Saga interface{} // Value a value for the invoking HandleTxNew on a next hop. Value interface{} // Timeout how long this Saga will be waiting for the result from the next hop. Default - 10 seconds Timeout uint // TrapCancel if the next saga fails, it will transform the cancel signal into the regular message gen.MessageSagaCancel, and HandleSagaInfo callback will be invoked. TrapCancel bool // contains filtered or unexported fields }
SagaNext
type SagaNextID ¶
SagaNextID
type SagaOptions ¶
type SagaOptions struct { // MaxTransactions defines the limit for the number of active transactions. Default: 0 (unlimited) MaxTransactions uint // Worker Worker SagaWorkerBehavior }
SagaOptions
type SagaProcess ¶
type SagaProcess struct { ServerProcess // contains filtered or unexported fields }
SagaProcess
func (*SagaProcess) CancelJob ¶
func (sp *SagaProcess) CancelJob(id SagaTransactionID, job SagaJobID, reason string) error
CancelJob
func (*SagaProcess) CancelTransaction ¶
func (sp *SagaProcess) CancelTransaction(id SagaTransactionID, reason string) error
CancelTransaction
func (*SagaProcess) Next ¶
func (sp *SagaProcess) Next(id SagaTransactionID, next SagaNext) (SagaNextID, error)
Next
func (*SagaProcess) SendInterim ¶
func (sp *SagaProcess) SendInterim(id SagaTransactionID, interim interface{}) error
SendInterim
func (*SagaProcess) SendResult ¶
func (sp *SagaProcess) SendResult(id SagaTransactionID, result interface{}) error
SendResult
func (*SagaProcess) StartJob ¶
func (sp *SagaProcess) StartJob(id SagaTransactionID, options SagaJobOptions, value interface{}) (SagaJobID, error)
StartJob
func (*SagaProcess) StartTransaction ¶
func (sp *SagaProcess) StartTransaction(options SagaTransactionOptions, value interface{}) SagaTransactionID
StartTransaction
type SagaTransaction ¶
SagaTransaction
type SagaTransactionID ¶
SagaTransactionID
type SagaTransactionOptions ¶
type SagaTransactionOptions struct { // HopLimit defines a number of hop within the transaction. Default limit // is 0 (no limit). HopLimit uint // Lifespan defines a lifespan for the transaction in seconds. Default is 60. Lifespan uint // TwoPhaseCommit enables 2PC for the transaction. This option makes all // Sagas involved in this transaction invoke HandleCommit callback on them and // invoke HandleCommitJob callback on Worker processes once the transaction is finished. TwoPhaseCommit bool }
SagaTransactionOptions
type SagaWorker ¶
type SagaWorker struct {
Server
}
SagaWorker
func (*SagaWorker) HandleCall ¶
func (w *SagaWorker) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleCall
func (*SagaWorker) HandleCast ¶
func (w *SagaWorker) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
HandleCast
func (*SagaWorker) HandleDirect ¶
func (w *SagaWorker) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
HandleDirect
func (*SagaWorker) HandleInfo ¶
func (w *SagaWorker) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
HandleInfo
func (*SagaWorker) HandleJobCommit ¶
func (w *SagaWorker) HandleJobCommit(process *SagaWorkerProcess, final interface{})
HandleJobCommit
func (*SagaWorker) HandleWorkerCall ¶
func (w *SagaWorker) HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleWorkerCall
func (*SagaWorker) HandleWorkerCast ¶
func (w *SagaWorker) HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus
HandleWorkerCast
func (*SagaWorker) HandleWorkerDirect ¶
func (w *SagaWorker) HandleWorkerDirect(process *SagaWorkerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
HandleWorkerDirect
func (*SagaWorker) HandleWorkerInfo ¶
func (w *SagaWorker) HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus
HandleWorkerInfo
func (*SagaWorker) HandleWorkerTerminate ¶
func (w *SagaWorker) HandleWorkerTerminate(process *SagaWorkerProcess, reason string)
HandleWorkerTerminate
func (*SagaWorker) Init ¶
func (w *SagaWorker) Init(process *ServerProcess, args ...etf.Term) error
Init
func (*SagaWorker) Terminate ¶
func (w *SagaWorker) Terminate(process *ServerProcess, reason string)
Terminate
type SagaWorkerBehavior ¶
type SagaWorkerBehavior interface { ServerBehavior // HandleJobStart invoked on a worker start HandleJobStart(process *SagaWorkerProcess, job SagaJob) error // HandleJobCancel invoked if transaction was canceled before the termination. HandleJobCancel(process *SagaWorkerProcess, reason string) // HandleJobCommit invoked if this job was a part of the transaction // with enabled TwoPhaseCommit option. All workers involved in this TX // handling are receiving this call. Callback invoked before the termination. HandleJobCommit(process *SagaWorkerProcess, final interface{}) // HandleWorkerInfo this callback is invoked on Process.Send. This method is optional // for the implementation HandleWorkerInfo(process *SagaWorkerProcess, message etf.Term) ServerStatus // HandleWorkerCast this callback is invoked on ServerProcess.Cast. This method is optional // for the implementation HandleWorkerCast(process *SagaWorkerProcess, message etf.Term) ServerStatus // HandleWorkerCall this callback is invoked on ServerProcess.Call. This method is optional // for the implementation HandleWorkerCall(process *SagaWorkerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleWorkerDirect this callback is invoked on Process.Direct. This method is optional // for the implementation HandleWorkerDirect(process *SagaWorkerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus) // HandleWorkerTerminate this callback invoked on a process termination HandleWorkerTerminate(process *SagaWorkerProcess, reason string) }
SagaWorkerBehavior
type SagaWorkerProcess ¶
type SagaWorkerProcess struct { ServerProcess // contains filtered or unexported fields }
SagaWorkerProcess
func (*SagaWorkerProcess) SendInterim ¶
func (wp *SagaWorkerProcess) SendInterim(interim interface{}) error
SendInterim
func (*SagaWorkerProcess) SendResult ¶
func (wp *SagaWorkerProcess) SendResult(result interface{}) error
SendResult sends the result and terminates this worker if 2PC is disabled. Otherwise, will be waiting for cancel/commit signal.
type Server ¶
type Server struct {
ServerBehavior
}
Server is implementation of ProcessBehavior interface for Server objects
func (*Server) HandleCall ¶
func (gs *Server) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleInfo
func (*Server) HandleCast ¶
func (gs *Server) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
HanldeCast
func (*Server) HandleDirect ¶
func (gs *Server) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
HandleDirect
func (*Server) HandleInfo ¶
func (gs *Server) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
HandleInfo
func (*Server) ProcessInit ¶
ProcessInit
func (*Server) ProcessLoop ¶
func (gs *Server) ProcessLoop(ps ProcessState, started chan<- bool) string
ProcessLoop
func (*Server) Terminate ¶
func (gs *Server) Terminate(process *ServerProcess, reason string)
Terminate
type ServerBehavior ¶
type ServerBehavior interface { ProcessBehavior // Init invoked on a start Server Init(process *ServerProcess, args ...etf.Term) error // HandleCast invoked if Server received message sent with ServerProcess.Cast. // Return ServerStatusStop to stop server with "normal" reason. Use ServerStatus(error) // for the custom reason HandleCast(process *ServerProcess, message etf.Term) ServerStatus // HandleCall invoked if Server got sync request using ServerProcess.Call HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleDirect invoked on a direct request made with Process.Direct HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus) // HandleInfo invoked if Server received message sent with Process.Send. HandleInfo(process *ServerProcess, message etf.Term) ServerStatus // Terminate invoked on a termination process. ServerProcess.State is not locked during // this callback. Terminate(process *ServerProcess, reason string) }
ServerBehavior interface
type ServerProcess ¶
type ServerProcess struct { ProcessState // contains filtered or unexported fields }
ServerState state of the Server process.
func (*ServerProcess) Call ¶
Call makes outgoing sync request in fashion of 'gen_server:call'. 'to' can be Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}.
func (*ServerProcess) CallWithTimeout ¶
func (sp *ServerProcess) CallWithTimeout(to interface{}, message etf.Term, timeout int) (etf.Term, error)
CallWithTimeout makes outgoing sync request in fashiod of 'gen_server:call' with given timeout.
func (*ServerProcess) Cast ¶
func (sp *ServerProcess) Cast(to interface{}, message etf.Term) error
Cast sends a message in fashion of 'gen_server:cast'. 'to' can be a Pid, registered local name or gen.ProcessID{RegisteredName, NodeName}
func (*ServerProcess) CastAfter ¶
func (sp *ServerProcess) CastAfter(to interface{}, message etf.Term, after time.Duration) CancelFunc
CastAfter a simple wrapper for Process.SendAfter to send a message in fashion of 'gen_server:cast'
func (*ServerProcess) MessageCounter ¶
func (sp *ServerProcess) MessageCounter() uint64
MessageCounter returns the total number of messages handled by Server callbacks: HandleCall, HandleCast, HandleInfo, HandleDirect
func (*ServerProcess) Reply ¶
Reply the handling process.Direct(...) calls can be done asynchronously using gen.DirectStatusIgnore as a returning status in the HandleDirect callback. In this case, you must reply manualy using gen.ServerProcess.Reply method in any other callback. If a caller has canceled this request due to timeout it returns lib.ErrReferenceUnknown
func (*ServerProcess) SendReply ¶
func (sp *ServerProcess) SendReply(from ServerFrom, reply etf.Term) error
SendReply sends a reply message to the sender made ServerProcess.Call request. Useful for the case with dispatcher and pool of workers: Dispatcher process forwards Call requests (asynchronously) within a HandleCall callback to the worker(s) using ServerProcess.Cast or ServerProcess.Send but returns ServerStatusIgnore instead of ServerStatusOK; Worker process sends result using ServerProcess.SendReply method with 'from' value received from the Dispatcher.
type ServerStatus ¶
type ServerStatus error
ServerStatus
func ServerStatusStopWithReason ¶
func ServerStatusStopWithReason(s string) ServerStatus
ServerStatusStopWithReason
type Stage ¶
type Stage struct {
Server
}
func (*Stage) HandleCall ¶
func (gst *Stage) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*Stage) HandleCancel ¶
func (gst *Stage) HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus
HandleCancel
func (*Stage) HandleCanceled ¶
func (gst *Stage) HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus
HandleCanceled
func (*Stage) HandleCast ¶
func (gst *Stage) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*Stage) HandleDemand ¶
func (gst *Stage) HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus)
HandleDemand
func (*Stage) HandleDirect ¶
func (gst *Stage) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
func (*Stage) HandleEvents ¶
func (gst *Stage) HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus
HanndleEvents
func (*Stage) HandleInfo ¶
func (gst *Stage) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*Stage) HandleStageCall ¶
func (gst *Stage) HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleSagaCall
func (*Stage) HandleStageCast ¶
func (gst *Stage) HandleStageCast(process *StageProcess, message etf.Term) ServerStatus
HandleStageCast
func (*Stage) HandleStageDirect ¶
func (gst *Stage) HandleStageDirect(process *StageProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
HandleStageDirect
func (*Stage) HandleStageInfo ¶
func (gst *Stage) HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus
HandleStageInfo
func (*Stage) HandleStageTerminate ¶
func (gst *Stage) HandleStageTerminate(process *StageProcess, reason string)
func (*Stage) HandleSubscribe ¶
func (gst *Stage) HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus
HandleSubscribe
func (*Stage) HandleSubscribed ¶
func (gst *Stage) HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus)
HandleSubscribed
func (*Stage) Init ¶
func (gst *Stage) Init(process *ServerProcess, args ...etf.Term) error
gen.Server callbacks
func (*Stage) InitStage ¶
func (gst *Stage) InitStage(process *StageProcess, args ...etf.Term) error
InitStage
func (*Stage) SetCancelMode ¶
func (s *Stage) SetCancelMode(p Process, subscription StageSubscription, cancel StageCancelMode) error
SetCancelMode defines how consumer will handle termination of the producer. There are 3 modes: StageCancelPermanent (default) - consumer exits when the producer cancels or exits StageCancelTransient - consumer exits only if reason is not normal, shutdown, or {shutdown, reason} StageCancelTemporary - never exits
func (*Stage) Terminate ¶
func (gst *Stage) Terminate(process *ServerProcess, reason string)
type StageBehavior ¶
type StageBehavior interface { ServerBehavior // InitStage InitStage(process *StageProcess, args ...etf.Term) (StageOptions, error) // HandleDemand this callback is invoked on a producer stage // The producer that implements this callback must either store the demand, or return the amount of requested events. HandleDemand(process *StageProcess, subscription StageSubscription, count uint) (etf.List, StageStatus) // HandleEvents this callback is invoked on a consumer stage. HandleEvents(process *StageProcess, subscription StageSubscription, events etf.List) StageStatus // HandleSubscribe This callback is invoked on a producer stage. HandleSubscribe(process *StageProcess, subscription StageSubscription, options StageSubscribeOptions) StageStatus // HandleSubscribed this callback is invoked as a confirmation for the subscription request // Returning false means that demand must be sent to producers explicitly using Ask method. // Returning true means the stage implementation will take care of automatically sending. HandleSubscribed(process *StageProcess, subscription StageSubscription, opts StageSubscribeOptions) (bool, StageStatus) // HandleCancel // Invoked when a consumer is no longer subscribed to a producer (invoked on a producer stage) // The cancelReason will be a {Cancel: "cancel", Reason: _} if the reason for cancellation // was a Stage.Cancel call. Any other value means the cancellation reason was // due to an EXIT. HandleCancel(process *StageProcess, subscription StageSubscription, reason string) StageStatus // HandleCanceled // Invoked when a consumer is no longer subscribed to a producer (invoked on a consumer stage) // Termination this stage depends on a cancel mode for the given subscription. For the cancel mode // StageCancelPermanent - this stage will be terminated right after this callback invoking. // For the cancel mode StageCancelTransient - it depends on a reason of subscription canceling. // Cancel mode StageCancelTemporary keeps this stage alive whether the reason could be. HandleCanceled(process *StageProcess, subscription StageSubscription, reason string) StageStatus // HandleStageCall this callback is invoked on ServerProcess.Call. This method is optional // for the implementation HandleStageCall(process *StageProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) // HandleStageDirect this callback is invoked on Process.Direct. This method is optional // for the implementation HandleStageDirect(process *StageProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus) // HandleStageCast this callback is invoked on ServerProcess.Cast. This method is optional // for the implementation HandleStageCast(process *StageProcess, message etf.Term) ServerStatus // HandleStageInfo this callback is invoked on Process.Send. This method is optional // for the implementation HandleStageInfo(process *StageProcess, message etf.Term) ServerStatus // HandleStageTerminate this callback is invoked on a termination process HandleStageTerminate(process *StageProcess, reason string) }
StageBehavior interface for the Stage inmplementation
type StageCancelMode ¶
type StageCancelMode uint
const ( StageCancelPermanent StageCancelMode = 0 StageCancelTransient StageCancelMode = 1 StageCancelTemporary StageCancelMode = 2 )
type StageCancelReason ¶
type StageDispatchItem ¶
type StageDispatchItem struct {
// contains filtered or unexported fields
}
type StageDispatcherBehavior ¶
type StageDispatcherBehavior interface { // InitStageDispatcher(opts) Init(opts StageOptions) (state interface{}) // Ask called every time a consumer sends demand Ask(state interface{}, subscription StageSubscription, count uint) // Cancel called every time a subscription is cancelled or the consumer goes down. Cancel(state interface{}, subscription StageSubscription) // Dispatch called every time a producer wants to dispatch an event. Dispatch(state interface{}, events etf.List) []StageDispatchItem // Subscribe called every time the producer gets a new subscriber Subscribe(state interface{}, subscription StageSubscription, opts StageSubscribeOptions) error }
StageDispatcherBehavior defined interface for the dispatcher implementation. To create a custom dispatcher you should implement this interface and use it in StageOptions as a Dispatcher
func CreateStageDispatcherBroadcast ¶
func CreateStageDispatcherBroadcast() StageDispatcherBehavior
CreateStageDispatcherBroadcast creates a dispatcher that accumulates demand from all consumers before broadcasting events to all of them. This dispatcher guarantees that events are dispatched to all consumers without exceeding the demand of any given consumer. The demand is only sent upstream once all consumers ask for data.
func CreateStageDispatcherDemand ¶
func CreateStageDispatcherDemand() StageDispatcherBehavior
CreateStageDispatcherDemand creates a dispatcher that sends batches to the highest demand. This is the default dispatcher used by Stage. In order to avoid greedy consumers, it is recommended that all consumers have exactly the same maximum demand.
func CreateStageDispatcherPartition ¶
func CreateStageDispatcherPartition(n uint, hash func(etf.Term) int) StageDispatcherBehavior
CreateStageDispatcherPartition creates a dispatcher that sends events according to partitions. Number of partitions 'n' must be > 0. 'hash' should return number within range [0,n). Value outside of this range is discarding event. If 'hash' is nil the random partition will be used on every event.
type StageOptions ¶
type StageOptions struct { // DisableDemandHandle. the demand is always handling using the HandleDemand callback. // When this options is set to 'true', demands are accumulated until mode is // set back to 'false' using SetDemandHandle(true) method DisableDemandHandle bool // BufferSize the size of the buffer to store events without demand. // default value = defaultDispatcherBufferSize BufferSize uint // BufferKeepLast defines whether the first or last entries should be // kept on the buffer in case the buffer size is exceeded. BufferKeepLast bool Dispatcher StageDispatcherBehavior }
StageOptions defines the producer configuration using Init callback. It will be ignored if it acts as a consumer only.
type StageProcess ¶
type StageProcess struct { ServerProcess // contains filtered or unexported fields }
func (*StageProcess) Ask ¶
func (p *StageProcess) Ask(subscription StageSubscription, count uint) error
Ask makes a demand request for the given subscription. This function must only be used in the cases when a consumer sets a subscription to manual mode using DisableAutoDemand
func (*StageProcess) AutoDemand ¶
func (p *StageProcess) AutoDemand(subscription StageSubscription) (bool, error)
AutoDemand returns value of the auto demand option
func (*StageProcess) Cancel ¶
func (p *StageProcess) Cancel(subscription StageSubscription, reason string) error
Cancel
func (*StageProcess) CancelMode ¶
func (p *StageProcess) CancelMode(subscription StageSubscription) (StageCancelMode, error)
CancelMode returns current cancel mode for the consumer
func (*StageProcess) DemandHandle ¶
func (p *StageProcess) DemandHandle() bool
DemandHandle returns whether enabled handling demand requests.
func (*StageProcess) SendEvents ¶
func (p *StageProcess) SendEvents(events etf.List) error
SendEvents sends events to the subscribers
func (*StageProcess) SetAutoDemand ¶
func (p *StageProcess) SetAutoDemand(subscription StageSubscription, autodemand bool) error
SetAutoDemand setting this option to false means that demand must be sent to producers explicitly using Ask method. This mode can be used when a special behavior is desired. Setting this options to true enables auto demand mode (this is default mode for the consumer)
func (*StageProcess) SetCancelMode ¶
func (p *StageProcess) SetCancelMode(subscription StageSubscription, mode StageCancelMode) error
SetCancelMode defines how consumer will handle termination of the producer. There are 3 modes: StageCancelPermanent (default) - consumer exits when the producer cancels or exits StageCancelTransient - consumer exits only if reason is not normal, shutdown, or {shutdown, reason} StageCancelTemporary - never exits
func (*StageProcess) SetDemandHandle ¶
func (p *StageProcess) SetDemandHandle(enable bool)
SetDemandHandle setting this option to false disables handling demand requests on a producer stage. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. By default this option is true.
func (*StageProcess) Subscribe ¶
func (p *StageProcess) Subscribe(producer etf.Term, opts StageSubscribeOptions) (StageSubscription, error)
Subscribe subscribes to the given producer. HandleSubscribed callback will be invoked on a consumer stage once a request for the subscription is sent. If something went wrong on a producer side the callback HandleCancel will be invoked with a reason of cancelation.
type StageStatus ¶
type StageStatus error
var ( StageStatusOK StageStatus = nil StageStatusStop StageStatus = fmt.Errorf("stop") StageStatusUnsupported StageStatus = fmt.Errorf("unsupported") StageStatusNotAProducer StageStatus = fmt.Errorf("not a producer") )
type StageSubscribeOptions ¶
type StageSubscribeOptions struct { MinDemand uint `etf:"min_demand"` MaxDemand uint `etf:"max_demand"` // The stage implementation will take care of automatically sending // demand to producer (as a default behavior). You can disable it // setting ManualDemand to true ManualDemand bool `etf:"manual"` // What should happened with consumer if producer has terminated // StageCancelPermanent the consumer exits when the producer cancels or exits. // StageCancelTransient the consumer exits only if reason is not "normal", // "shutdown", or {"shutdown", _} // StageCancelTemporary the consumer never exits Cancel StageCancelMode `etf:"cancel"` // Partition is defined the number of partition this subscription should belongs to. // This option uses in the DispatcherPartition Partition uint `etf:"partition"` // Extra is intended to be a custom set of options for the custom implementation // of StageDispatcherBehavior Extra etf.Term `etf:"extra"` }
type Supervisor ¶
type Supervisor struct{}
Supervisor is implementation of ProcessBehavior interface
func (*Supervisor) ProcessInit ¶
func (sv *Supervisor) ProcessInit(p Process, args ...etf.Term) (ProcessState, error)
ProcessInit
func (*Supervisor) ProcessLoop ¶
func (sv *Supervisor) ProcessLoop(ps ProcessState, started chan<- bool) string
ProcessLoop
func (*Supervisor) StartChild ¶
func (sv *Supervisor) StartChild(supervisor Process, name string, args ...etf.Term) (Process, error)
StartChild dynamically starts a child process with given name of child spec which is defined by Init call.
type SupervisorBehavior ¶
type SupervisorBehavior interface { ProcessBehavior Init(args ...etf.Term) (SupervisorSpec, error) }
SupervisorBehavior interface
type SupervisorChildSpec ¶
type SupervisorChildSpec struct { Name string Child ProcessBehavior Options ProcessOptions Args []etf.Term // contains filtered or unexported fields }
SupervisorChildSpec
type SupervisorSpec ¶
type SupervisorSpec struct { Name string Children []SupervisorChildSpec Strategy SupervisorStrategy // contains filtered or unexported fields }
SupervisorSpec
type SupervisorStrategy ¶
type SupervisorStrategy struct { Type SupervisorStrategyType Intensity uint16 Period uint16 Restart SupervisorStrategyRestart }
SupervisorStrategy
type TCP ¶
type TCP struct {
Server
}
func (*TCP) HandleCall ¶
func (tcp *TCP) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*TCP) HandleCast ¶
func (tcp *TCP) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*TCP) HandleInfo ¶
func (tcp *TCP) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*TCP) HandleTCPCall ¶
func (tcp *TCP) HandleTCPCall(process *TCPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleTCPCall
func (*TCP) HandleTCPCast ¶
func (tcp *TCP) HandleTCPCast(process *TCPProcess, message etf.Term) ServerStatus
HandleTCPCast
func (*TCP) HandleTCPInfo ¶
func (tcp *TCP) HandleTCPInfo(process *TCPProcess, message etf.Term) ServerStatus
HandleTCPInfo
func (*TCP) HandleTCPTerminate ¶
func (tcp *TCP) HandleTCPTerminate(process *TCPProcess, reason string)
func (*TCP) Init ¶
func (tcp *TCP) Init(process *ServerProcess, args ...etf.Term) error
Server callbacks
func (*TCP) Terminate ¶
func (tcp *TCP) Terminate(process *ServerProcess, reason string)
type TCPBehavior ¶
type TCPBehavior interface { ServerBehavior InitTCP(process *TCPProcess, args ...etf.Term) (TCPOptions, error) HandleTCPCall(process *TCPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) HandleTCPCast(process *TCPProcess, message etf.Term) ServerStatus HandleTCPInfo(process *TCPProcess, message etf.Term) ServerStatus HandleTCPTerminate(process *TCPProcess, reason string) }
type TCPHandler ¶
type TCPHandler struct { Server // contains filtered or unexported fields }
func (*TCPHandler) HandleCall ¶
func (tcph *TCPHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*TCPHandler) HandleCast ¶
func (tcph *TCPHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*TCPHandler) HandleConnect ¶
func (tcph *TCPHandler) HandleConnect(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus
func (*TCPHandler) HandleDirect ¶
func (tcph *TCPHandler) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
func (*TCPHandler) HandleDisconnect ¶
func (tcph *TCPHandler) HandleDisconnect(process *TCPHandlerProcess, conn *TCPConnection)
func (*TCPHandler) HandleInfo ¶
func (tcph *TCPHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*TCPHandler) HandleTCPHandlerCall ¶
func (tcph *TCPHandler) HandleTCPHandlerCall(process *TCPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleTCPHandlerCall
func (*TCPHandler) HandleTCPHandlerCast ¶
func (tcph *TCPHandler) HandleTCPHandlerCast(process *TCPHandlerProcess, message etf.Term) ServerStatus
HandleTCPHandlerCast
func (*TCPHandler) HandleTCPHandlerInfo ¶
func (tcph *TCPHandler) HandleTCPHandlerInfo(process *TCPHandlerProcess, message etf.Term) ServerStatus
HandleTCPHandlerInfo
func (*TCPHandler) HandleTCPHandlerTerminate ¶
func (tcph *TCPHandler) HandleTCPHandlerTerminate(process *TCPHandlerProcess, reason string)
func (*TCPHandler) HandleTimeout ¶
func (tcph *TCPHandler) HandleTimeout(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus
func (*TCPHandler) Init ¶
func (tcph *TCPHandler) Init(process *ServerProcess, args ...etf.Term) error
func (*TCPHandler) Terminate ¶
func (tcph *TCPHandler) Terminate(process *ServerProcess, reason string)
type TCPHandlerBehavior ¶
type TCPHandlerBehavior interface { ServerBehavior // Mandatory callback HandlePacket(process *TCPHandlerProcess, packet []byte, conn *TCPConnection) (int, int, TCPHandlerStatus) // Optional callbacks HandleConnect(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus HandleDisconnect(process *TCPHandlerProcess, conn *TCPConnection) HandleTimeout(process *TCPHandlerProcess, conn *TCPConnection) TCPHandlerStatus HandleTCPHandlerCall(process *TCPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) HandleTCPHandlerCast(process *TCPHandlerProcess, message etf.Term) ServerStatus HandleTCPHandlerInfo(process *TCPHandlerProcess, message etf.Term) ServerStatus HandleTCPHandlerTerminate(process *TCPHandlerProcess, reason string) }
type TCPHandlerProcess ¶
type TCPHandlerProcess struct { ServerProcess // contains filtered or unexported fields }
func (*TCPHandlerProcess) SetTrapExit ¶
func (tcpp *TCPHandlerProcess) SetTrapExit(trap bool)
we should disable SetTrapExit for the TCPHandlerProcess by overriding it.
type TCPHandlerStatus ¶
type TCPHandlerStatus error
var ( TCPHandlerStatusOK TCPHandlerStatus = nil TCPHandlerStatusClose TCPHandlerStatus = fmt.Errorf("close") )
type TCPOptions ¶
type TCPOptions struct { Host string Port uint16 TLS *tls.Config KeepAlivePeriod int Handler TCPHandlerBehavior // QueueLength defines how many parallel requests can be directed to this process. Default value is 10. QueueLength int // NumHandlers defines how many handlers will be started. Default 1 NumHandlers int // IdleTimeout defines how long (in seconds) keeps the started handler alive with no packets. Zero value makes the handler non-stop. IdleTimeout int DeadlineTimeout int MaxPacketSize int // ExtraHandlers enables starting new handlers if all handlers in the pool are busy. ExtraHandlers bool }
type TCPProcess ¶
type TCPProcess struct { ServerProcess // contains filtered or unexported fields }
type UDP ¶
type UDP struct {
Server
}
func (*UDP) HandleCall ¶
func (udp *UDP) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*UDP) HandleCast ¶
func (udp *UDP) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*UDP) HandleInfo ¶
func (udp *UDP) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*UDP) HandleUDPCall ¶
func (udp *UDP) HandleUDPCall(process *UDPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleUDPCall
func (*UDP) HandleUDPCast ¶
func (udp *UDP) HandleUDPCast(process *UDPProcess, message etf.Term) ServerStatus
HandleUDPCast
func (*UDP) HandleUDPInfo ¶
func (udp *UDP) HandleUDPInfo(process *UDPProcess, message etf.Term) ServerStatus
HandleUDPInfo
func (*UDP) HandleUDPTerminate ¶
func (udp *UDP) HandleUDPTerminate(process *UDPProcess, reason string)
func (*UDP) Init ¶
func (udp *UDP) Init(process *ServerProcess, args ...etf.Term) error
Server callbacks
func (*UDP) Terminate ¶
func (udp *UDP) Terminate(process *ServerProcess, reason string)
type UDPBehavior ¶
type UDPBehavior interface { ServerBehavior InitUDP(process *UDPProcess, args ...etf.Term) (UDPOptions, error) HandleUDPCall(process *UDPProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) HandleUDPCast(process *UDPProcess, message etf.Term) ServerStatus HandleUDPInfo(process *UDPProcess, message etf.Term) ServerStatus HandleUDPTerminate(process *UDPProcess, reason string) }
type UDPHandler ¶
type UDPHandler struct {
Server
}
func (*UDPHandler) HandleCall ¶
func (udph *UDPHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*UDPHandler) HandleCast ¶
func (udph *UDPHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*UDPHandler) HandleInfo ¶
func (udph *UDPHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*UDPHandler) HandleTimeout ¶
func (udph *UDPHandler) HandleTimeout(process *UDPHandlerProcess)
func (*UDPHandler) HandleUDPHandlerCall ¶
func (udph *UDPHandler) HandleUDPHandlerCall(process *UDPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleUDPHandlerCall
func (*UDPHandler) HandleUDPHandlerCast ¶
func (udph *UDPHandler) HandleUDPHandlerCast(process *UDPHandlerProcess, message etf.Term) ServerStatus
HandleUDPHandlerCast
func (*UDPHandler) HandleUDPHandlerInfo ¶
func (udph *UDPHandler) HandleUDPHandlerInfo(process *UDPHandlerProcess, message etf.Term) ServerStatus
HandleUDPHandlerInfo
func (*UDPHandler) HandleUDPHandlerTerminate ¶
func (udph *UDPHandler) HandleUDPHandlerTerminate(process *UDPHandlerProcess, reason string)
func (*UDPHandler) Init ¶
func (udph *UDPHandler) Init(process *ServerProcess, args ...etf.Term) error
func (*UDPHandler) Terminate ¶
func (udph *UDPHandler) Terminate(process *ServerProcess, reason string)
type UDPHandlerBehavior ¶
type UDPHandlerBehavior interface { ServerBehavior // Mandatory callback HandlePacket(process *UDPHandlerProcess, data []byte, packet UDPPacket) // Optional callbacks HandleTimeout(process *UDPHandlerProcess) HandleUDPHandlerCall(process *UDPHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) HandleUDPHandlerCast(process *UDPHandlerProcess, message etf.Term) ServerStatus HandleUDPHandlerInfo(process *UDPHandlerProcess, message etf.Term) ServerStatus HandleUDPHandlerTerminate(process *UDPHandlerProcess, reason string) }
type UDPHandlerProcess ¶
type UDPHandlerProcess struct { ServerProcess // contains filtered or unexported fields }
func (*UDPHandlerProcess) SetTrapExit ¶
func (udpp *UDPHandlerProcess) SetTrapExit(trap bool)
we should disable SetTrapExit for the UDPHandlerProcess by overriding it.
type UDPOptions ¶
type UDPProcess ¶
type UDPProcess struct { ServerProcess // contains filtered or unexported fields }
type Web ¶
type Web struct {
Server
}
func (*Web) HandleCall ¶
func (web *Web) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleCall
func (*Web) HandleCast ¶
func (web *Web) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
HandleCast
func (*Web) HandleDirect ¶
func (web *Web) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
HandleDirect
func (*Web) HandleInfo ¶
func (web *Web) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
HandleInfo
func (*Web) HandleWebCall ¶
func (web *Web) HandleWebCall(process *WebProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleWebCall
func (*Web) HandleWebCast ¶
func (web *Web) HandleWebCast(process *WebProcess, message etf.Term) ServerStatus
HandleWebCast
func (*Web) HandleWebInfo ¶
func (web *Web) HandleWebInfo(process *WebProcess, message etf.Term) ServerStatus
HandleWebInfo
func (*Web) Terminate ¶
func (web *Web) Terminate(process *ServerProcess, reason string)
type WebBehavior ¶
type WebBehavior interface { ServerBehavior // mandatory method InitWeb(process *WebProcess, args ...etf.Term) (WebOptions, error) // optional methods HandleWebCall(process *WebProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) HandleWebCast(process *WebProcess, message etf.Term) ServerStatus HandleWebInfo(process *WebProcess, message etf.Term) ServerStatus }
type WebHandler ¶
type WebHandler struct { Server // contains filtered or unexported fields }
func (*WebHandler) HandleCall ¶
func (wh *WebHandler) HandleCall(process *ServerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
func (*WebHandler) HandleCast ¶
func (wh *WebHandler) HandleCast(process *ServerProcess, message etf.Term) ServerStatus
func (*WebHandler) HandleDirect ¶
func (wh *WebHandler) HandleDirect(process *ServerProcess, ref etf.Ref, message interface{}) (interface{}, DirectStatus)
func (*WebHandler) HandleInfo ¶
func (wh *WebHandler) HandleInfo(process *ServerProcess, message etf.Term) ServerStatus
func (*WebHandler) HandleWebHandlerCall ¶
func (wh *WebHandler) HandleWebHandlerCall(process *WebHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus)
HandleWebHandlerCall
func (*WebHandler) HandleWebHandlerCast ¶
func (wh *WebHandler) HandleWebHandlerCast(process *WebHandlerProcess, message etf.Term) ServerStatus
HandleWebHandlerCast
func (*WebHandler) HandleWebHandlerInfo ¶
func (wh *WebHandler) HandleWebHandlerInfo(process *WebHandlerProcess, message etf.Term) ServerStatus
HandleWebHandlerInfo
func (*WebHandler) HandleWebHandlerTerminate ¶
func (wh *WebHandler) HandleWebHandlerTerminate(process *WebHandlerProcess, reason string, count int64)
func (*WebHandler) Init ¶
func (wh *WebHandler) Init(process *ServerProcess, args ...etf.Term) error
func (*WebHandler) ServeHTTP ¶
func (wh *WebHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*WebHandler) Terminate ¶
func (wh *WebHandler) Terminate(process *ServerProcess, reason string)
type WebHandlerBehavior ¶
type WebHandlerBehavior interface { ServerBehavior // Mandatory callback HandleRequest(process *WebHandlerProcess, request WebMessageRequest) WebHandlerStatus // Optional callbacks HandleWebHandlerCall(process *WebHandlerProcess, from ServerFrom, message etf.Term) (etf.Term, ServerStatus) HandleWebHandlerCast(process *WebHandlerProcess, message etf.Term) ServerStatus HandleWebHandlerInfo(process *WebHandlerProcess, message etf.Term) ServerStatus HandleWebHandlerTerminate(process *WebHandlerProcess, reason string, count int64) // contains filtered or unexported methods }
type WebHandlerOptions ¶
type WebHandlerOptions struct { // Timeout for web-requests. The default timeout is 5 seconds. It can also be // overridden within HTTP requests using the header 'Request-Timeout' RequestTimeout int // RequestQueueLength defines how many parallel requests can be directed to this process. Default value is 10. RequestQueueLength int // NumHandlers defines how many handlers will be started. Default 1 NumHandlers int // IdleTimeout defines how long (in seconds) keep the started handler alive with no requests. Zero value makes handler not stop. IdleTimeout int }
type WebHandlerProcess ¶
type WebHandlerProcess struct { ServerProcess // contains filtered or unexported fields }
func (*WebHandlerProcess) SetTrapExit ¶
func (whp *WebHandlerProcess) SetTrapExit(trap bool)
we should disable SetTrapExit for the WebHandlerProcess by overriding it.
type WebHandlerStatus ¶
type WebHandlerStatus error
var ( WebHandlerStatusDone WebHandlerStatus = nil WebHandlerStatusWait WebHandlerStatus = fmt.Errorf("wait") )
type WebMessageRequest ¶
type WebOptions ¶
type WebProcess ¶
type WebProcess struct { ServerProcess // contains filtered or unexported fields }
func (*WebProcess) StartWebHandler ¶
func (wp *WebProcess) StartWebHandler(web WebHandlerBehavior, options WebHandlerOptions) http.Handler