Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncRequestProcessWorker ¶
type AsyncRequestProcessWorker struct {
// contains filtered or unexported fields
}
AsyncRequestProcessWorker is the worker to process async requests.
func New ¶
func New( options Options, sm manager.StatusManager, qu queue.Client, ctrlRegistry *ControllerRegistry) *AsyncRequestProcessWorker
New creates AsyncRequestProcessWorker server instance.
func (*AsyncRequestProcessWorker) Start ¶
func (w *AsyncRequestProcessWorker) Start(ctx context.Context) error
Start starts worker's message loop - it starts a loop to process messages from a queue concurrently, and handles deduplication, updating resource and operation status, and running the operation. It returns an error if it fails to start the dequeuer.
type ControllerFactoryFunc ¶
type ControllerFactoryFunc func(opts ctrl.Options) (ctrl.Controller, error)
ControllerFactoryFunc is a factory function to create a controller.
type ControllerRegistry ¶
type ControllerRegistry struct {
// contains filtered or unexported fields
}
ControllerRegistry is an registry to register async controllers.
func NewControllerRegistry ¶
func NewControllerRegistry() *ControllerRegistry
NewControllerRegistry creates an ControllerRegistry instance.
func (*ControllerRegistry) Get ¶
func (h *ControllerRegistry) Get(operationType v1.OperationType) (ctrl.Controller, error)
Get gets the registered async controller instance.
func (*ControllerRegistry) Register ¶
func (h *ControllerRegistry) Register(resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error
Register registers a controller for a specific resource type and operation method.
Controllers registered using Register will be cached by the registry and the same instance will be reused.
func (*ControllerRegistry) RegisterDefault ¶ added in v0.41.0
func (h *ControllerRegistry) RegisterDefault(factoryFn ControllerFactoryFunc, opts ctrl.Options) error
RegisterDefault registers a default controller that will be used when no other controller is found.
The default controller will be used when Get is called with an operation type that has no registered controller. The default controller will not be cached by the registry.
type Options ¶
type Options struct { // MaxOperationConcurrency is the maximum concurrency to process async request operation. MaxOperationConcurrency int // MaxOperationRetryCount is the maximum retry count to process async request operation. MaxOperationRetryCount int // MessageExtendMargin is the margin duration for clock skew before extending message lock. MessageExtendMargin time.Duration // MinMessageLockDuration is the minimum duration of message lock duration. MinMessageLockDuration time.Duration // DeduplicationDuration is the duration for the deduplication detection. DeduplicationDuration time.Duration // DequeueIntervalDuration is the duration for the dequeue interval. DequeueIntervalDuration time.Duration }
Options configures AsyncRequestProcessorWorker
type Service ¶
type Service struct { // DatabaseClient is database client. DatabaseClient database.Client // OperationStatusManager is the manager of the operation status. OperationStatusManager manager.StatusManager // Options configures options for the async worker. Options Options // QueueProvider is the queue client. QueueClient queue.Client // contains filtered or unexported fields }
Service is the base worker service implementation to initialize and start worker. All exported fields should be initialized by the caller.
func (*Service) Controllers ¶
func (s *Service) Controllers() *ControllerRegistry
Controllers returns the controller registry for the worker service.