Documentation ¶
Overview ¶
Package queue provides several queue implementations, originally inspired by Kubernetes pkg/client/cache/fifo.
Index ¶
- func WithoutCancel() <-chan struct{}
- type BreakChan
- type Breakout
- type Copyable
- type DeadlinePolicy
- type Deadlined
- type DelayFIFO
- func (q *DelayFIFO) Add(d UniqueDelayed, rp ReplacementPolicy)
- func (q *DelayFIFO) Await(timeout time.Duration) UniqueID
- func (c *DelayFIFO) ContainedIDs() sets.String
- func (f *DelayFIFO) Delete(id string)
- func (f *DelayFIFO) Get(id string) (UniqueID, bool)
- func (f *DelayFIFO) List() []UniqueID
- func (q *DelayFIFO) Offer(d UniqueDeadlined, rp ReplacementPolicy) bool
- func (q *DelayFIFO) Pop(cancel <-chan struct{}) UniqueID
- type DelayQueue
- type Delayed
- type Entry
- type EventType
- type FIFO
- type HistoricalFIFO
- func (f *HistoricalFIFO) Add(v interface{}) error
- func (q *HistoricalFIFO) Await(timeout time.Duration) interface{}
- func (c *HistoricalFIFO) ContainedIDs() sets.String
- func (f *HistoricalFIFO) Delete(v interface{}) error
- func (f *HistoricalFIFO) Get(v interface{}) (interface{}, bool, error)
- func (f *HistoricalFIFO) GetByKey(id string) (interface{}, bool, error)
- func (f *HistoricalFIFO) List() []interface{}
- func (f *HistoricalFIFO) ListKeys() []string
- func (f *HistoricalFIFO) Poll(id string, t EventType) bool
- func (f *HistoricalFIFO) Pop(cancel <-chan struct{}) interface{}
- func (f *HistoricalFIFO) Replace(objs []interface{}, resourceVersion string) error
- func (f *HistoricalFIFO) Update(obj interface{}) error
- type Priority
- type ReplacementPolicy
- type UniqueCopyable
- type UniqueDeadlined
- type UniqueDelayed
- type UniqueID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithoutCancel ¶ added in v0.3.0
func WithoutCancel() <-chan struct{}
WithoutCancel returns a chan that may never be closed and always blocks
Types ¶
type BreakChan ¶
type BreakChan <-chan struct{}
No objects are ever expected to be sent over this channel. References to BreakChan instances may be nil (always blocking). Signalling over this channel is performed by closing the channel. As such there can only ever be a single signal sent over the lifetime of the channel.
type Breakout ¶
type Breakout interface { // return a channel that signals early departure from a blocking delay Breaker() BreakChan }
an optional interface to be implemented by Delayed objects; returning a nil channel from Breaker() results in waiting the full delay duration
type Copyable ¶
type Copyable interface { // return an independent copy (deep clone) of the current object Copy() Copyable }
type DeadlinePolicy ¶
type DeadlinePolicy int
Decide whether a pre-existing deadline for an item in a delay-queue should be updated if an attempt is made to offer/add a new deadline for said item. Whether the deadline changes or not has zero impact on the data blob associated with the entry in the queue.
const ( PreferLatest DeadlinePolicy = iota PreferEarliest )
type DelayFIFO ¶
type DelayFIFO struct {
// contains filtered or unexported fields
}
If multiple adds/updates of a single item happen while an item is in the queue before it has been processed, it will only be processed once, and when it is processed, the most recent version will be processed. Items are popped in order of their priority, currently controlled by a delay or deadline assigned to each item in the queue.
func NewDelayFIFO ¶
func NewDelayFIFO() *DelayFIFO
func (*DelayFIFO) Add ¶
func (q *DelayFIFO) Add(d UniqueDelayed, rp ReplacementPolicy)
Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.
func (*DelayFIFO) ContainedIDs ¶
ContainedIDs returns a stringset.StringSet containing all IDs of the stored items. This is a snapshot of a moment in time, and one should keep in mind that other go routines can add or remove items after you call this.
func (*DelayFIFO) Delete ¶
Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not their priority order.
func (*DelayFIFO) Offer ¶
func (q *DelayFIFO) Offer(d UniqueDeadlined, rp ReplacementPolicy) bool
type DelayQueue ¶
type DelayQueue struct {
// contains filtered or unexported fields
}
concurrency-safe, deadline-oriented queue that returns items after their delay period has expired.
func NewDelayQueue ¶
func NewDelayQueue() *DelayQueue
func (*DelayQueue) Add ¶
func (q *DelayQueue) Add(d Delayed)
func (*DelayQueue) Offer ¶
func (q *DelayQueue) Offer(d Deadlined) bool
If there's a deadline reported by d.Deadline() then `d` is added to the queue and this func returns true.
func (*DelayQueue) Pop ¶
func (q *DelayQueue) Pop() interface{}
wait for the delay of the next item in the queue to expire, blocking if there are no items in the queue. does not guarantee first-come-first-serve ordering with respect to clients.
type Entry ¶
type Entry interface { Copyable Value() UniqueCopyable // types is a logically OR'd combination of EventType, e.g. ADD_EVENT|UPDATE_EVENT Is(types EventType) bool }
type FIFO ¶
type FIFO interface { cache.Store // Pop waits until an item is ready and returns it. If multiple items are // ready, they are returned in the order in which they were added/updated. // The item is removed from the queue (and the store) before it is returned, // so if you don't successfully process it, you need to add it back with Add(). Pop(cancel <-chan struct{}) interface{} // Await attempts to Pop within the given interval; upon success the non-nil // item is returned, otherwise nil Await(timeout time.Duration) interface{} // Is there an entry for the id that matches the event mask? Poll(id string, types EventType) bool }
type HistoricalFIFO ¶
type HistoricalFIFO struct {
// contains filtered or unexported fields
}
HistoricalFIFO receives adds and updates from a Reflector, and puts them in a queue for FIFO order processing. If multiple adds/updates of a single item happen while an item is in the queue before it has been processed, it will only be processed once, and when it is processed, the most recent version will be processed. This can't be done with a channel.
func NewHistorical ¶
func NewHistorical(ch chan<- Entry) *HistoricalFIFO
NewHistorical returns a Store which can be used to queue up items to process. If a non-nil Mux is provided, then modifications to the the FIFO are delivered on a channel specific to this fifo.
func (*HistoricalFIFO) Add ¶
func (f *HistoricalFIFO) Add(v interface{}) error
Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.
func (*HistoricalFIFO) Await ¶
func (q *HistoricalFIFO) Await(timeout time.Duration) interface{}
Variant of DelayQueue.Pop() for UniqueDelayed items
func (*HistoricalFIFO) ContainedIDs ¶
func (c *HistoricalFIFO) ContainedIDs() sets.String
ContainedIDs returns a stringset.StringSet containing all IDs of the stored items. This is a snapshot of a moment in time, and one should keep in mind that other go routines can add or remove items after you call this.
func (*HistoricalFIFO) Delete ¶
func (f *HistoricalFIFO) Delete(v interface{}) error
Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not the order in which they were created/added.
func (*HistoricalFIFO) Get ¶
func (f *HistoricalFIFO) Get(v interface{}) (interface{}, bool, error)
Get returns the requested item, or sets exists=false.
func (*HistoricalFIFO) GetByKey ¶
func (f *HistoricalFIFO) GetByKey(id string) (interface{}, bool, error)
Get returns the requested item, or sets exists=false.
func (*HistoricalFIFO) List ¶
func (f *HistoricalFIFO) List() []interface{}
List returns a list of all the items.
func (*HistoricalFIFO) ListKeys ¶
func (f *HistoricalFIFO) ListKeys() []string
List returns a list of all the items.
func (*HistoricalFIFO) Poll ¶
func (f *HistoricalFIFO) Poll(id string, t EventType) bool
Get returns the requested item, or sets exists=false.
func (*HistoricalFIFO) Pop ¶
func (f *HistoricalFIFO) Pop(cancel <-chan struct{}) interface{}
Pop blocks until either there is an item available to dequeue or else the specified cancel chan is closed. Callers that have no interest in providing a cancel chan should specify nil, or else WithoutCancel() (for readability).
func (*HistoricalFIFO) Replace ¶
func (f *HistoricalFIFO) Replace(objs []interface{}, resourceVersion string) error
func (*HistoricalFIFO) Update ¶
func (f *HistoricalFIFO) Update(obj interface{}) error
Update is the same as Add in this implementation.
type ReplacementPolicy ¶
type ReplacementPolicy int
Decide whether a pre-existing data blob in a delay-queue should be replaced if an an attempt is made to add/offer a new data blob in its place. Whether the data is replaced has no bearing on the deadline (priority) of the item in the queue.
const ( KeepExisting ReplacementPolicy = iota ReplaceExisting )