Documentation ¶
Index ¶
- type BucketFunc
- type BucketKey
- type ChainThrottler
- type GetSyncLimit
- type IsWorkflowDeleted
- type Key
- type LockKind
- type LockName
- type Manager
- func (cm *Manager) CheckWorkflowExistence()
- func (cm *Manager) Initialize(wfs []wfv1.Workflow)
- func (cm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization)
- func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool
- func (cm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, error)
- type NextWorkflow
- type PriorityMutex
- type PrioritySemaphore
- type QueueFunc
- type Semaphore
- type Throttler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BucketFunc ¶ added in v3.1.0
var NamespaceBucket BucketFunc = func(key Key) BucketKey { namespace, _, _ := cache.SplitMetaNamespaceKey(key) return namespace }
var SingleBucket BucketFunc = func(key Key) BucketKey { return "" }
type ChainThrottler ¶ added in v3.1.0
type ChainThrottler []Throttler
func (ChainThrottler) Add ¶ added in v3.1.0
func (c ChainThrottler) Add(key Key, priority int32, creationTime time.Time)
func (ChainThrottler) Admit ¶ added in v3.1.0
func (c ChainThrottler) Admit(key Key) bool
func (ChainThrottler) Init ¶ added in v3.1.9
func (c ChainThrottler) Init(wfs []wfv1.Workflow) error
func (ChainThrottler) Remove ¶ added in v3.1.0
func (c ChainThrottler) Remove(key Key)
type GetSyncLimit ¶
type IsWorkflowDeleted ¶
type LockName ¶
func DecodeLockName ¶
func GetLockName ¶
func GetLockName(sync *v1alpha1.Synchronization, namespace string) (*LockName, error)
func NewLockName ¶
func (*LockName) EncodeName ¶
func (*LockName) ValidateEncoding ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewLockManager ¶
func NewLockManager(getSyncLimit GetSyncLimit, nextWorkflow NextWorkflow, isWFDeleted IsWorkflowDeleted) *Manager
func (*Manager) CheckWorkflowExistence ¶
func (cm *Manager) CheckWorkflowExistence()
func (*Manager) Initialize ¶
func (*Manager) TryAcquire ¶
func (cm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, error)
TryAcquire tries to acquire the lock from semaphore. It returns status of acquiring a lock , status of Workflow status updated, waiting message if lock is not available and any error encountered
type NextWorkflow ¶
type NextWorkflow func(string)
type PriorityMutex ¶
type PriorityMutex struct {
// contains filtered or unexported fields
}
func NewMutex ¶
func NewMutex(name string, nextWorkflow NextWorkflow) *PriorityMutex
NewMutex creates new mutex lock object name of the mutex callbackFunc is a release notification function.
type PrioritySemaphore ¶
type PrioritySemaphore struct {
// contains filtered or unexported fields
}
func NewSemaphore ¶
func NewSemaphore(name string, limit int, nextWorkflow NextWorkflow, lockType string) *PrioritySemaphore
type Throttler ¶
type Throttler interface { Init(wfs []wfv1.Workflow) error Add(key Key, priority int32, creationTime time.Time) // Admit returns if the item should be processed. Admit(key Key) bool // Remove notifies throttler that item processing is no longer needed Remove(key Key) }
Throttler allows the controller to limit number of items it is processing in parallel. Items are processed in priority order, and one processing starts, other items (including higher-priority items) will be kept pending until the processing is complete. Implementations should be idempotent.
func NewThrottler ¶
func NewThrottler(parallelism int, bucketFunc BucketFunc, queue QueueFunc) Throttler
NewThrottler returns a throttle that only runs `parallelism` items at once. When an item may need processing, `queue` is invoked.