director

package
v0.0.0-...-cdef871 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2023 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConfigEntryDelayMax            = "xbus.director.delay.max"
	ConfigEntryDelayIncreaseFactor = "xbus.director.delay.increase-factor"
)
View Source
const (
	// DirectorQueue is the name we use to queueSub to a queue with the director
	DirectorQueue = "director_queue"
)

Variables

View Source
var (
	ConfigEntryDelayMaxDefaultValue            = 30 * time.Minute
	ConfigEntryDelayIncreaseFactorDefaultValue = 100 // 100%
)
View Source
var ErrBufferFull = errors.New("RingBuffer is full")

ErrBufferFull is when the buffer is full

Functions

This section is empty.

Types

type ActorHandler

type ActorHandler struct {
	DelayMax            time.Duration
	DelayIncreaseFactor int // expressed in 0.01
	// contains filtered or unexported fields
}

ActorHandler is responsible for calling the 'Process' method of the actors, and handle the results

func (*ActorHandler) CancelProcessJobs

func (h *ActorHandler) CancelProcessJobs(id api.UUID, soft bool)

func (*ActorHandler) Close

func (h *ActorHandler) Close()

func (*ActorHandler) Start

func (h *ActorHandler) Start()

func (*ActorHandler) Stop

func (h *ActorHandler) Stop()

func (*ActorHandler) Submit

func (h *ActorHandler) Submit(job *Job)

type ActorHandlerConfig

type ActorHandlerConfig struct {
	Log             zerolog.Logger
	Conn            *nats.Conn
	ActorStorage    storage.ActorStorage
	JobStorage      storage.JobStorage
	ProcessManager  *process.Manager
	EnvelopeStorage storage.EnvelopeStorage
	ActorMap        *svc_tools.ActorMap
	Director        directorServer
	EnvelopeWatcher *EnvelopeWatcher
}

ActorHandlerConfig holds the settings for instanciating a ActorHandler

type ActorSet

type ActorSet struct {
	// contains filtered or unexported fields
}

ActorSet This API is NOT thread-safe

func (*ActorSet) CancelProcessJobs

func (s *ActorSet) CancelProcessJobs(processID api.UUID, soft bool)

CancelProcessJobs cancel all running jobs that belongs to the given process

func (*ActorSet) Close

func (s *ActorSet) Close()

Close closes all the underlying handlers

func (*ActorSet) Get

func (s *ActorSet) Get(id api.UUID) *ActorHandler

Get returns the handler of a given actor, or nil if it does not exist

func (*ActorSet) GetOrCreate

func (s *ActorSet) GetOrCreate(id api.UUID) *ActorHandler

GetOrCreate returns the handler of a given actor, creating it if needed

func (*ActorSet) SetOffline

func (s *ActorSet) SetOffline(id api.UUID)

SetOnline stops the handler of the given actor

func (*ActorSet) SetOnline

func (s *ActorSet) SetOnline(id api.UUID)

SetOnline starts the handler of the given actor

func (*ActorSet) Submit

func (s *ActorSet) Submit(job *Job)

Submit dispatches a job to the correct actor handler if it exists

type ActorSetConfig

type ActorSetConfig struct {
	Log             zerolog.Logger
	Conn            *nats.Conn
	ActorStorage    storage.ActorStorage
	JobStorage      storage.JobStorage
	ProcessManager  *process.Manager
	EnvelopeStorage storage.EnvelopeStorage
	ActorMap        *svc_tools.ActorMap
	Director        directorServer
	EnvelopeWatcher *EnvelopeWatcher
}

ActorSetConfig holds the settings for instanciating an ActorSet

type CancelProcessJobsArgs

type CancelProcessJobsArgs struct {
	ProcessID api.UUID
	Soft      bool
}

type EnvelopeEventHandler

type EnvelopeEventHandler func(api.EnvelopeEvent)

EnvelopeEventHandler is a function that handles a api.EnvelopeEvent

type EnvelopeWatcher

type EnvelopeWatcher struct {
	// contains filtered or unexported fields
}

EnvelopeWatcher can monitor status changes on several envelopes

func NewEnvelopeWatcher

func NewEnvelopeWatcher() *EnvelopeWatcher

NewEnvelopeWatcher initialize a EnvelopeWatcher

func (*EnvelopeWatcher) PushEvent

func (w *EnvelopeWatcher) PushEvent(event api.EnvelopeEvent)

PushEvent pushes an event

func (*EnvelopeWatcher) Unsubscribe

func (w *EnvelopeWatcher) Unsubscribe(sub *Subscription)

Unsubscribe disable a watcher

func (*EnvelopeWatcher) Watch

func (w *EnvelopeWatcher) Watch(handler EnvelopeEventHandler, ids ...api.UUID) *Subscription

Watch starts watching for envelope status change and calls the given function on each event

type Job

type Job struct {
	storage.Job
	// contains filtered or unexported fields
}

Job ...

func (*Job) SaveState

func (job *Job) SaveState()

type JobQueue

type JobQueue struct {
	// contains filtered or unexported fields
}

JobQueue is a persistent queue for an actor jobs

func (*JobQueue) Add

func (queue *JobQueue) Add(job *Job)

Add a job to the queue. The job must already exist in the database

func (*JobQueue) C

func (queue *JobQueue) C() <-chan *Job

C returns a chan of jobs

func (*JobQueue) Close

func (queue *JobQueue) Close()

Close the queue resources and routine

func (*JobQueue) PushBack

func (queue *JobQueue) PushBack(job *Job)

PushBack add a job in front of the queue

func (*JobQueue) SendNilToOut

func (queue *JobQueue) SendNilToOut()

SendNilToOut sends a nil pointer to the output chan

type JobRunner

type JobRunner struct {
	*ActorHandler
	// contains filtered or unexported fields
}

type Options

type Options struct {
	services.Options

	ActorStorage    storage.ActorStorage
	EnvelopeStorage storage.EnvelopeStorage
	LogStorage      storage.LogStorage
	ProcessManager  *process.Manager
	JobStorage      storage.JobStorage
}

Options is used to create a Service

func (Options) NewService

func (o Options) NewService() *Service

NewService returns a new Service

type RingBuffer

type RingBuffer struct {
	Size      int    // Size of the Ringbuffer
	Container []*Job // Array container of objects
	Reader    int    // Reader position
	Writer    int    // Writer Position
	// contains filtered or unexported fields
}

RingBuffer Structure

func NewRingBuffer

func NewRingBuffer(size int) *RingBuffer

NewRingBuffer creates a new RingBuffer of initial size "size" Returns a pointer to the new RingBuffer

func (*RingBuffer) Count

func (r *RingBuffer) Count() int

Count returns the number of elements in the buffer

func (*RingBuffer) Empty

func (r *RingBuffer) Empty() bool

Empty returns true if the buffer is empty

func (*RingBuffer) Full

func (r *RingBuffer) Full() bool

Full returns true if the buffer is full

func (*RingBuffer) PeekLast

func (r *RingBuffer) PeekLast() (value *Job)

PeekLast returns the last inserted object if any

func (*RingBuffer) PushBack

func (r *RingBuffer) PushBack(v *Job) error

PushBack object in front of the buffer

func (*RingBuffer) Read

func (r *RingBuffer) Read() (value *Job)

Read single object from the RingBuffer

func (*RingBuffer) TakeBack

func (r *RingBuffer) TakeBack() (value *Job)

TakeBack removes the last inserted object

func (*RingBuffer) Write

func (r *RingBuffer) Write(v *Job) error

Write object into the RingBuffer

type Service

type Service struct {
	services.BaseService
	Opts Options
	// contains filtered or unexported fields
}

Service is the director service implementation The director service is responsible for calling the 'Process' function on the actors. It monitors the ProcessNodeReadyEvent events and schedule jobs When a actor is back online or a when a process is resumed, the corresponding jobs are resheduled The director service acts alone a is not (yet) able to cooperate with other director instances.

func New

func New(options Options) *Service

New returns a new Service

func (*Service) Shutdown

func (s *Service) Shutdown() error

Shutdown the service

func (*Service) Startup

func (s *Service) Startup() error

Startup initialize all the subscriptions

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is returned by Watch

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

Unsubscribe stops the watcher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL