Documentation
¶
Index ¶
- Constants
- Variables
- type ActorHandler
- type ActorHandlerConfig
- type ActorSet
- func (s *ActorSet) CancelProcessJobs(processID api.UUID, soft bool)
- func (s *ActorSet) Close()
- func (s *ActorSet) Get(id api.UUID) *ActorHandler
- func (s *ActorSet) GetOrCreate(id api.UUID) *ActorHandler
- func (s *ActorSet) SetOffline(id api.UUID)
- func (s *ActorSet) SetOnline(id api.UUID)
- func (s *ActorSet) Submit(job *Job)
- type ActorSetConfig
- type CancelProcessJobsArgs
- type EnvelopeEventHandler
- type EnvelopeWatcher
- type Job
- type JobQueue
- type JobRunner
- type Options
- type RingBuffer
- func (r *RingBuffer) Count() int
- func (r *RingBuffer) Empty() bool
- func (r *RingBuffer) Full() bool
- func (r *RingBuffer) PeekLast() (value *Job)
- func (r *RingBuffer) PushBack(v *Job) error
- func (r *RingBuffer) Read() (value *Job)
- func (r *RingBuffer) TakeBack() (value *Job)
- func (r *RingBuffer) Write(v *Job) error
- type Service
- type Subscription
Constants ¶
const ( ConfigEntryDelayMax = "xbus.director.delay.max" ConfigEntryDelayIncreaseFactor = "xbus.director.delay.increase-factor" )
const (
// DirectorQueue is the name we use to queueSub to a queue with the director
DirectorQueue = "director_queue"
)
Variables ¶
var ( ConfigEntryDelayMaxDefaultValue = 30 * time.Minute ConfigEntryDelayIncreaseFactorDefaultValue = 100 // 100% )
var ErrBufferFull = errors.New("RingBuffer is full")
ErrBufferFull is when the buffer is full
Functions ¶
This section is empty.
Types ¶
type ActorHandler ¶
type ActorHandler struct { DelayMax time.Duration DelayIncreaseFactor int // expressed in 0.01 // contains filtered or unexported fields }
ActorHandler is responsible for calling the 'Process' method of the actors, and handle the results
func (*ActorHandler) CancelProcessJobs ¶
func (h *ActorHandler) CancelProcessJobs(id api.UUID, soft bool)
func (*ActorHandler) Close ¶
func (h *ActorHandler) Close()
func (*ActorHandler) Start ¶
func (h *ActorHandler) Start()
func (*ActorHandler) Stop ¶
func (h *ActorHandler) Stop()
func (*ActorHandler) Submit ¶
func (h *ActorHandler) Submit(job *Job)
type ActorHandlerConfig ¶
type ActorHandlerConfig struct { Log zerolog.Logger Conn *nats.Conn ActorStorage storage.ActorStorage JobStorage storage.JobStorage ProcessManager *process.Manager EnvelopeStorage storage.EnvelopeStorage ActorMap *svc_tools.ActorMap Director directorServer EnvelopeWatcher *EnvelopeWatcher }
ActorHandlerConfig holds the settings for instanciating a ActorHandler
type ActorSet ¶
type ActorSet struct {
// contains filtered or unexported fields
}
ActorSet This API is NOT thread-safe
func (*ActorSet) CancelProcessJobs ¶
CancelProcessJobs cancel all running jobs that belongs to the given process
func (*ActorSet) Get ¶
func (s *ActorSet) Get(id api.UUID) *ActorHandler
Get returns the handler of a given actor, or nil if it does not exist
func (*ActorSet) GetOrCreate ¶
func (s *ActorSet) GetOrCreate(id api.UUID) *ActorHandler
GetOrCreate returns the handler of a given actor, creating it if needed
func (*ActorSet) SetOffline ¶
SetOnline stops the handler of the given actor
type ActorSetConfig ¶
type ActorSetConfig struct { Log zerolog.Logger Conn *nats.Conn ActorStorage storage.ActorStorage JobStorage storage.JobStorage ProcessManager *process.Manager EnvelopeStorage storage.EnvelopeStorage ActorMap *svc_tools.ActorMap Director directorServer EnvelopeWatcher *EnvelopeWatcher }
ActorSetConfig holds the settings for instanciating an ActorSet
type CancelProcessJobsArgs ¶
type EnvelopeEventHandler ¶
type EnvelopeEventHandler func(api.EnvelopeEvent)
EnvelopeEventHandler is a function that handles a api.EnvelopeEvent
type EnvelopeWatcher ¶
type EnvelopeWatcher struct {
// contains filtered or unexported fields
}
EnvelopeWatcher can monitor status changes on several envelopes
func NewEnvelopeWatcher ¶
func NewEnvelopeWatcher() *EnvelopeWatcher
NewEnvelopeWatcher initialize a EnvelopeWatcher
func (*EnvelopeWatcher) PushEvent ¶
func (w *EnvelopeWatcher) PushEvent(event api.EnvelopeEvent)
PushEvent pushes an event
func (*EnvelopeWatcher) Unsubscribe ¶
func (w *EnvelopeWatcher) Unsubscribe(sub *Subscription)
Unsubscribe disable a watcher
func (*EnvelopeWatcher) Watch ¶
func (w *EnvelopeWatcher) Watch(handler EnvelopeEventHandler, ids ...api.UUID) *Subscription
Watch starts watching for envelope status change and calls the given function on each event
type JobQueue ¶
type JobQueue struct {
// contains filtered or unexported fields
}
JobQueue is a persistent queue for an actor jobs
func (*JobQueue) SendNilToOut ¶
func (queue *JobQueue) SendNilToOut()
SendNilToOut sends a nil pointer to the output chan
type JobRunner ¶
type JobRunner struct { *ActorHandler // contains filtered or unexported fields }
type Options ¶
type Options struct { services.Options ActorStorage storage.ActorStorage EnvelopeStorage storage.EnvelopeStorage LogStorage storage.LogStorage ProcessManager *process.Manager JobStorage storage.JobStorage }
Options is used to create a Service
type RingBuffer ¶
type RingBuffer struct { Size int // Size of the Ringbuffer Container []*Job // Array container of objects Reader int // Reader position Writer int // Writer Position // contains filtered or unexported fields }
RingBuffer Structure
func NewRingBuffer ¶
func NewRingBuffer(size int) *RingBuffer
NewRingBuffer creates a new RingBuffer of initial size "size" Returns a pointer to the new RingBuffer
func (*RingBuffer) Count ¶
func (r *RingBuffer) Count() int
Count returns the number of elements in the buffer
func (*RingBuffer) Empty ¶
func (r *RingBuffer) Empty() bool
Empty returns true if the buffer is empty
func (*RingBuffer) PeekLast ¶
func (r *RingBuffer) PeekLast() (value *Job)
PeekLast returns the last inserted object if any
func (*RingBuffer) PushBack ¶
func (r *RingBuffer) PushBack(v *Job) error
PushBack object in front of the buffer
func (*RingBuffer) Read ¶
func (r *RingBuffer) Read() (value *Job)
Read single object from the RingBuffer
func (*RingBuffer) TakeBack ¶
func (r *RingBuffer) TakeBack() (value *Job)
TakeBack removes the last inserted object
func (*RingBuffer) Write ¶
func (r *RingBuffer) Write(v *Job) error
Write object into the RingBuffer
type Service ¶
type Service struct { services.BaseService Opts Options // contains filtered or unexported fields }
Service is the director service implementation The director service is responsible for calling the 'Process' function on the actors. It monitors the ProcessNodeReadyEvent events and schedule jobs When a actor is back online or a when a process is resumed, the corresponding jobs are resheduled The director service acts alone a is not (yet) able to cooperate with other director instances.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is returned by Watch
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe()
Unsubscribe stops the watcher