watch

package
v0.31.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2024 License: Apache-2.0 Imports: 8 Imported by: 49,067

Documentation

Overview

Package watch contains a generic watchable interface, and a fake for testing code that uses the watch interface.

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultChanSize int32 = 100
)

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster distributes event notifications among any number of watchers. Every event is delivered to every watcher.

func NewBroadcaster

func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster

NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher. It is guaranteed that events will be distributed in the order in which they occur, but the order in which a single event is distributed among all of the watchers is unspecified.

func NewLongQueueBroadcaster added in v0.21.0

func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster

NewLongQueueBroadcaster functions nearly identically to NewBroadcaster, except that the incoming queue is the same size as the outgoing queues (specified by queueLength).

func (*Broadcaster) Action

func (m *Broadcaster) Action(action EventType, obj runtime.Object) error

Action distributes the given event among all watchers.

func (*Broadcaster) ActionOrDrop added in v0.21.0

func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error)

Action distributes the given event among all watchers, or drops it on the floor if too many incoming actions are queued up. Returns true if the action was sent, false if dropped.

func (*Broadcaster) Shutdown

func (m *Broadcaster) Shutdown()

Shutdown disconnects all watchers (but any queued events will still be distributed). You must not call Action or Watch* after calling Shutdown. This call blocks until all events have been distributed through the outbound channels. Note that since they can be buffered, this means that the watchers might not have received the data yet as it can remain sitting in the buffered channel. It will block until the broadcaster stop request is actually executed

func (*Broadcaster) Watch

func (m *Broadcaster) Watch() (Interface, error)

Watch adds a new watcher to the list and returns an Interface for it. Note: new watchers will only receive new events. They won't get an entire history of previous events. It will block until the watcher is actually added to the broadcaster.

func (*Broadcaster) WatchWithPrefix

func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) (Interface, error)

WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends queuedEvents down the new watch before beginning to send ordinary events from Broadcaster. The returned watch will have a queue length that is at least large enough to accommodate all of the items in queuedEvents. It will block until the watcher is actually added to the broadcaster.

type Decoder

type Decoder interface {
	// Decode should return the type of event, the decoded object, or an error.
	// An error will cause StreamWatcher to call Close(). Decode should block until
	// it has data or an error occurs.
	Decode() (action EventType, object runtime.Object, err error)

	// Close should close the underlying io.Reader, signalling to the source of
	// the stream that it is no longer being watched. Close() must cause any
	// outstanding call to Decode() to return with an error of some sort.
	Close()
}

Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.

type Event

type Event struct {
	Type EventType

	// Object is:
	//  * If Type is Added or Modified: the new state of the object.
	//  * If Type is Deleted: the state of the object immediately before deletion.
	//  * If Type is Bookmark: the object (instance of a type being watched) where
	//    only ResourceVersion field is set. On successful restart of watch from a
	//    bookmark resourceVersion, client is guaranteed to not get repeat event
	//    nor miss any events.
	//  * If Type is Error: *api.Status is recommended; other types may make sense
	//    depending on context.
	Object runtime.Object
}

Event represents a single event to a watched resource. +k8s:deepcopy-gen=true

func (*Event) DeepCopy

func (in *Event) DeepCopy() *Event

DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Event.

func (*Event) DeepCopyInto

func (in *Event) DeepCopyInto(out *Event)

DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.

type EventType

type EventType string

EventType defines the possible types of events.

const (
	Added    EventType = "ADDED"
	Modified EventType = "MODIFIED"
	Deleted  EventType = "DELETED"
	Bookmark EventType = "BOOKMARK"
	Error    EventType = "ERROR"
)

type FakeWatcher

type FakeWatcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.

func NewFake

func NewFake() *FakeWatcher

func NewFakeWithChanSize

func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher

func (*FakeWatcher) Action

func (f *FakeWatcher) Action(action EventType, obj runtime.Object)

Action sends an event of the requested type, for table-based testing.

func (*FakeWatcher) Add

func (f *FakeWatcher) Add(obj runtime.Object)

Add sends an add event.

func (*FakeWatcher) Delete

func (f *FakeWatcher) Delete(lastValue runtime.Object)

Delete sends a delete event.

func (*FakeWatcher) Error

func (f *FakeWatcher) Error(errValue runtime.Object)

Error sends an Error event.

func (*FakeWatcher) IsStopped

func (f *FakeWatcher) IsStopped() bool

func (*FakeWatcher) Modify

func (f *FakeWatcher) Modify(obj runtime.Object)

Modify sends a modify event.

func (*FakeWatcher) Reset

func (f *FakeWatcher) Reset()

Reset prepares the watcher to be reused.

func (*FakeWatcher) ResultChan

func (f *FakeWatcher) ResultChan() <-chan Event

func (*FakeWatcher) Stop

func (f *FakeWatcher) Stop()

Stop implements Interface.Stop().

type FilterFunc

type FilterFunc func(in Event) (out Event, keep bool)

FilterFunc should take an event, possibly modify it in some way, and return the modified event. If the event should be ignored, then return keep=false.

type FullChannelBehavior

type FullChannelBehavior int

FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch channel is full.

const (
	WaitIfChannelFull FullChannelBehavior = iota
	DropIfChannelFull
)

type Interface

type Interface interface {
	// Stop tells the producer that the consumer is done watching, so the
	// producer should stop sending events and close the result channel. The
	// consumer should keep watching for events until the result channel is
	// closed.
	//
	// Because some implementations may create channels when constructed, Stop
	// must always be called, even if the consumer has not yet called
	// ResultChan().
	//
	// Only the consumer should call Stop(), not the producer. If the producer
	// errors and needs to stop the watch prematurely, it should instead send
	// an error event and close the result channel.
	Stop()

	// ResultChan returns a channel which will receive events from the event
	// producer. If an error occurs or Stop() is called, the producer must
	// close this channel and release any resources used by the watch.
	// Closing the result channel tells the consumer that no more events will be
	// sent.
	ResultChan() <-chan Event
}

Interface can be implemented by anything that knows how to watch and report changes.

func Filter

func Filter(w Interface, f FilterFunc) Interface

Filter passes all events through f before allowing them to pass on. Putting a filter on a watch, as an unavoidable side-effect due to the way go channels work, effectively causes the watch's event channel to have its queue length increased by one.

WARNING: filter has a fatal flaw, in that it can't properly update the Type field (Add/Modified/Deleted) to reflect items beginning to pass the filter when they previously didn't.

func NewEmptyWatch

func NewEmptyWatch() Interface

NewEmptyWatch returns a watch interface that returns no results and is closed. May be used in certain error conditions where no information is available but an error is not warranted.

type MockWatcher added in v0.31.0

type MockWatcher struct {
	StopFunc       func()
	ResultChanFunc func() <-chan Event
}

MockWatcher implements watch.Interface with mockable functions.

func (MockWatcher) ResultChan added in v0.31.0

func (mw MockWatcher) ResultChan() <-chan Event

ResultChan calls ResultChanFunc

func (MockWatcher) Stop added in v0.31.0

func (mw MockWatcher) Stop()

Stop calls StopFunc

type ProxyWatcher

type ProxyWatcher struct {
	// contains filtered or unexported fields
}

ProxyWatcher lets you wrap your channel in watch Interface. threadsafe.

func NewProxyWatcher

func NewProxyWatcher(ch chan Event) *ProxyWatcher

NewProxyWatcher creates new ProxyWatcher by wrapping a channel

func (*ProxyWatcher) ResultChan

func (pw *ProxyWatcher) ResultChan() <-chan Event

ResultChan implements Interface

func (*ProxyWatcher) Stop

func (pw *ProxyWatcher) Stop()

Stop implements Interface

func (*ProxyWatcher) StopChan

func (pw *ProxyWatcher) StopChan() <-chan struct{}

StopChan returns stop channel

func (*ProxyWatcher) Stopping

func (pw *ProxyWatcher) Stopping() bool

Stopping returns true if Stop() has been called

type RaceFreeFakeWatcher

type RaceFreeFakeWatcher struct {
	Stopped bool
	sync.Mutex
	// contains filtered or unexported fields
}

RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.

func NewRaceFreeFake

func NewRaceFreeFake() *RaceFreeFakeWatcher

func (*RaceFreeFakeWatcher) Action

func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object)

Action sends an event of the requested type, for table-based testing.

func (*RaceFreeFakeWatcher) Add

func (f *RaceFreeFakeWatcher) Add(obj runtime.Object)

Add sends an add event.

func (*RaceFreeFakeWatcher) Delete

func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object)

Delete sends a delete event.

func (*RaceFreeFakeWatcher) Error

func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object)

Error sends an Error event.

func (*RaceFreeFakeWatcher) IsStopped

func (f *RaceFreeFakeWatcher) IsStopped() bool

func (*RaceFreeFakeWatcher) Modify

func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object)

Modify sends a modify event.

func (*RaceFreeFakeWatcher) Reset

func (f *RaceFreeFakeWatcher) Reset()

Reset prepares the watcher to be reused.

func (*RaceFreeFakeWatcher) ResultChan

func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event

func (*RaceFreeFakeWatcher) Stop

func (f *RaceFreeFakeWatcher) Stop()

Stop implements Interface.Stop().

type Recorder

type Recorder struct {
	Interface
	// contains filtered or unexported fields
}

Recorder records all events that are sent from the watch until it is closed.

func NewRecorder

func NewRecorder(w Interface) *Recorder

NewRecorder wraps an Interface and records any changes sent across it.

func (*Recorder) Events

func (r *Recorder) Events() []Event

Events returns a copy of the events sent across this recorder.

type Reporter

type Reporter interface {
	// AsObject must convert err into a valid runtime.Object for the watch stream.
	AsObject(err error) runtime.Object
}

Reporter hides the details of how an error is turned into a runtime.Object for reporting on a watch stream since this package may not import a higher level report.

type StreamWatcher

type StreamWatcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

StreamWatcher turns any stream for which you can write a Decoder interface into a watch.Interface.

func NewStreamWatcher

func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher

NewStreamWatcher creates a StreamWatcher from the given decoder.

func (*StreamWatcher) ResultChan

func (sw *StreamWatcher) ResultChan() <-chan Event

ResultChan implements Interface.

func (*StreamWatcher) Stop

func (sw *StreamWatcher) Stop()

Stop implements Interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL