sync

package
v3.0.0-...-8a51905 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BucketFunc

type BucketFunc func(Key) BucketKey
var NamespaceBucket BucketFunc = func(key Key) BucketKey {
	namespace, _, _ := cache.SplitMetaNamespaceKey(key)
	return namespace
}
var SingleBucket BucketFunc = func(key Key) BucketKey { return "" }

type BucketKey

type BucketKey = string

type ChainThrottler

type ChainThrottler []Throttler

func (ChainThrottler) Add

func (c ChainThrottler) Add(key Key, priority int32, creationTime time.Time)

func (ChainThrottler) Admit

func (c ChainThrottler) Admit(key Key) bool

func (ChainThrottler) Init

func (c ChainThrottler) Init(wfs []wfv1.Workflow) error

func (ChainThrottler) Remove

func (c ChainThrottler) Remove(key Key)

type GetSyncLimit

type GetSyncLimit func(string) (int, error)

type IsWorkflowDeleted

type IsWorkflowDeleted func(string) bool

type Key

type Key = string

type LockKind

type LockKind string
const (
	LockKindConfigMap LockKind = "ConfigMap"
	LockKindMutex     LockKind = "Mutex"
)

type LockName

type LockName struct {
	Namespace    string
	ResourceName string
	Key          string
	Kind         LockKind
}

func DecodeLockName

func DecodeLockName(lockName string) (*LockName, error)

func GetLockName

func GetLockName(sync *v1alpha1.Synchronization, namespace string) (*LockName, error)

func NewLockName

func NewLockName(namespace, resourceName, lockKey string, kind LockKind) *LockName

func (*LockName) EncodeName

func (ln *LockName) EncodeName() string

func (*LockName) Validate

func (ln *LockName) Validate() error

func (*LockName) ValidateEncoding

func (ln *LockName) ValidateEncoding(encoding string) string

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewLockManager

func NewLockManager(getSyncLimit GetSyncLimit, nextWorkflow NextWorkflow, isWFDeleted IsWorkflowDeleted) *Manager

func (*Manager) CheckWorkflowExistence

func (cm *Manager) CheckWorkflowExistence()

func (*Manager) Initialize

func (cm *Manager) Initialize(wfs []wfv1.Workflow)

func (*Manager) Release

func (cm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization)

func (*Manager) ReleaseAll

func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool

func (*Manager) TryAcquire

func (cm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, error)

TryAcquire tries to acquire the lock from semaphore. It returns status of acquiring a lock , status of Workflow status updated, waiting message if lock is not available and any error encountered

type NextWorkflow

type NextWorkflow func(string)

type PriorityMutex

type PriorityMutex struct {
	// contains filtered or unexported fields
}

func NewMutex

func NewMutex(name string, nextWorkflow NextWorkflow) *PriorityMutex

NewMutex creates new mutex lock object name of the mutex callbackFunc is a release notification function.

type PrioritySemaphore

type PrioritySemaphore struct {
	// contains filtered or unexported fields
}

func NewSemaphore

func NewSemaphore(name string, limit int, nextWorkflow NextWorkflow, lockType string) *PrioritySemaphore

type QueueFunc

type QueueFunc func(Key)

type Semaphore

type Semaphore interface {
	// contains filtered or unexported methods
}

type Throttler

type Throttler interface {
	Init(wfs []wfv1.Workflow) error
	Add(key Key, priority int32, creationTime time.Time)
	// Admit returns if the item should be processed.
	Admit(key Key) bool
	// Remove notifies throttler that item processing is no longer needed
	Remove(key Key)
}

Throttler allows the controller to limit number of items it is processing in parallel. Items are processed in priority order, and one processing starts, other items (including higher-priority items) will be kept pending until the processing is complete. Implementations should be idempotent.

func NewThrottler

func NewThrottler(parallelism int, bucketFunc BucketFunc, queue QueueFunc) Throttler

NewThrottler returns a throttle that only runs `parallelism` items at once. When an item may need processing, `queue` is invoked.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL