queue

package
v1.3.0-alpha.3....-c20dcfc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2016 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package queue provides several queue implementations, originally inspired by Kubernetes pkg/client/cache/fifo.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithoutCancel

func WithoutCancel() <-chan struct{}

WithoutCancel returns a chan that may never be closed and always blocks

Types

type BreakChan

type BreakChan <-chan struct{}

No objects are ever expected to be sent over this channel. References to BreakChan instances may be nil (always blocking). Signalling over this channel is performed by closing the channel. As such there can only ever be a single signal sent over the lifetime of the channel.

type Breakout

type Breakout interface {
	// return a channel that signals early departure from a blocking delay
	Breaker() BreakChan
}

an optional interface to be implemented by Delayed objects; returning a nil channel from Breaker() results in waiting the full delay duration

type Copyable

type Copyable interface {
	// return an independent copy (deep clone) of the current object
	Copy() Copyable
}

type DeadlinePolicy

type DeadlinePolicy int

Decide whether a pre-existing deadline for an item in a delay-queue should be updated if an attempt is made to offer/add a new deadline for said item. Whether the deadline changes or not has zero impact on the data blob associated with the entry in the queue.

const (
	PreferLatest DeadlinePolicy = iota
	PreferEarliest
)

type Deadlined

type Deadlined interface {
	// when ok, returns the time when this object should be activated/executed/evaluated
	Deadline() (deadline time.Time, ok bool)
}

type DelayFIFO

type DelayFIFO struct {
	// contains filtered or unexported fields
}

If multiple adds/updates of a single item happen while an item is in the queue before it has been processed, it will only be processed once, and when it is processed, the most recent version will be processed. Items are popped in order of their priority, currently controlled by a delay or deadline assigned to each item in the queue.

func NewDelayFIFO

func NewDelayFIFO() *DelayFIFO

func (*DelayFIFO) Add

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*DelayFIFO) Await

func (q *DelayFIFO) Await(timeout time.Duration) UniqueID

Variant of DelayQueue.Pop() for UniqueDelayed items

func (*DelayFIFO) ContainedIDs

func (c *DelayFIFO) ContainedIDs() sets.String

ContainedIDs returns a stringset.StringSet containing all IDs of the stored items. This is a snapshot of a moment in time, and one should keep in mind that other go routines can add or remove items after you call this.

func (*DelayFIFO) Delete

func (f *DelayFIFO) Delete(id string)

Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not their priority order.

func (*DelayFIFO) Get

func (f *DelayFIFO) Get(id string) (UniqueID, bool)

Get returns the requested item, or sets exists=false.

func (*DelayFIFO) List

func (f *DelayFIFO) List() []UniqueID

List returns a list of all the items.

func (*DelayFIFO) Offer

func (*DelayFIFO) Pop

func (q *DelayFIFO) Pop(cancel <-chan struct{}) UniqueID

Pop blocks until either there is an item available to dequeue or else the specified cancel chan is closed. Callers that have no interest in providing a cancel chan should specify nil, or else WithoutCancel() (for readability).

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

concurrency-safe, deadline-oriented queue that returns items after their delay period has expired.

func NewDelayQueue

func NewDelayQueue() *DelayQueue

func (*DelayQueue) Add

func (q *DelayQueue) Add(d Delayed)

func (*DelayQueue) Offer

func (q *DelayQueue) Offer(d Deadlined) bool

If there's a deadline reported by d.Deadline() then `d` is added to the queue and this func returns true.

func (*DelayQueue) Pop

func (q *DelayQueue) Pop() interface{}

wait for the delay of the next item in the queue to expire, blocking if there are no items in the queue. does not guarantee first-come-first-serve ordering with respect to clients.

type Delayed

type Delayed interface {
	// return the remaining delay; a non-positive value indicates no delay
	GetDelay() time.Duration
}

type Entry

type Entry interface {
	Copyable
	Value() UniqueCopyable
	// types is a logically OR'd combination of EventType, e.g. ADD_EVENT|UPDATE_EVENT
	Is(types EventType) bool
}

type EventType

type EventType int
const (
	ADD_EVENT EventType = 1 << iota
	UPDATE_EVENT
	DELETE_EVENT
	POP_EVENT
)

type FIFO

type FIFO interface {
	cache.Store

	// Pop waits until an item is ready and returns it. If multiple items are
	// ready, they are returned in the order in which they were added/updated.
	// The item is removed from the queue (and the store) before it is returned,
	// so if you don't successfully process it, you need to add it back with Add().
	Pop(cancel <-chan struct{}) interface{}

	// Await attempts to Pop within the given interval; upon success the non-nil
	// item is returned, otherwise nil
	Await(timeout time.Duration) interface{}

	// Is there an entry for the id that matches the event mask?
	Poll(id string, types EventType) bool
}

type HistoricalFIFO

type HistoricalFIFO struct {
	// contains filtered or unexported fields
}

HistoricalFIFO receives adds and updates from a Reflector, and puts them in a queue for FIFO order processing. If multiple adds/updates of a single item happen while an item is in the queue before it has been processed, it will only be processed once, and when it is processed, the most recent version will be processed. This can't be done with a channel.

func NewHistorical

func NewHistorical(ch chan<- Entry) *HistoricalFIFO

NewHistorical returns a Store which can be used to queue up items to process. If a non-nil Mux is provided, then modifications to the the FIFO are delivered on a channel specific to this fifo.

func (*HistoricalFIFO) Add

func (f *HistoricalFIFO) Add(v interface{}) error

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*HistoricalFIFO) Await

func (q *HistoricalFIFO) Await(timeout time.Duration) interface{}

Variant of DelayQueue.Pop() for UniqueDelayed items

func (*HistoricalFIFO) ContainedIDs

func (c *HistoricalFIFO) ContainedIDs() sets.String

ContainedIDs returns a stringset.StringSet containing all IDs of the stored items. This is a snapshot of a moment in time, and one should keep in mind that other go routines can add or remove items after you call this.

func (*HistoricalFIFO) Delete

func (f *HistoricalFIFO) Delete(v interface{}) error

Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not the order in which they were created/added.

func (*HistoricalFIFO) Get

func (f *HistoricalFIFO) Get(v interface{}) (interface{}, bool, error)

Get returns the requested item, or sets exists=false.

func (*HistoricalFIFO) GetByKey

func (f *HistoricalFIFO) GetByKey(id string) (interface{}, bool, error)

Get returns the requested item, or sets exists=false.

func (*HistoricalFIFO) List

func (f *HistoricalFIFO) List() []interface{}

List returns a list of all the items.

func (*HistoricalFIFO) ListKeys

func (f *HistoricalFIFO) ListKeys() []string

List returns a list of all the items.

func (*HistoricalFIFO) Poll

func (f *HistoricalFIFO) Poll(id string, t EventType) bool

Get returns the requested item, or sets exists=false.

func (*HistoricalFIFO) Pop

func (f *HistoricalFIFO) Pop(cancel <-chan struct{}) interface{}

Pop blocks until either there is an item available to dequeue or else the specified cancel chan is closed. Callers that have no interest in providing a cancel chan should specify nil, or else WithoutCancel() (for readability).

func (*HistoricalFIFO) Replace

func (f *HistoricalFIFO) Replace(objs []interface{}, resourceVersion string) error

func (*HistoricalFIFO) Update

func (f *HistoricalFIFO) Update(obj interface{}) error

Update is the same as Add in this implementation.

type Priority

type Priority struct {
	// contains filtered or unexported fields
}

func (Priority) Equal

func (p Priority) Equal(other Priority) bool

type ReplacementPolicy

type ReplacementPolicy int

Decide whether a pre-existing data blob in a delay-queue should be replaced if an an attempt is made to add/offer a new data blob in its place. Whether the data is replaced has no bearing on the deadline (priority) of the item in the queue.

const (
	KeepExisting ReplacementPolicy = iota
	ReplaceExisting
)

type UniqueCopyable

type UniqueCopyable interface {
	Copyable
	UniqueID
}

type UniqueDeadlined

type UniqueDeadlined interface {
	UniqueID
	Deadlined
}

type UniqueDelayed

type UniqueDelayed interface {
	UniqueID
	Delayed
}

type UniqueID

type UniqueID interface {
	GetUID() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL