Documentation ¶
Overview ¶
Package framework implements all the grunt work involved in running a simple controller.
Example ¶
package main import ( "fmt" "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func main() { // source simulates an apiserver object endpoint. source := framework.NewFakeControllerSource() // This will hold the downstream state, as we know it. downstream := cache.NewStore(cache.MetaNamespaceKeyFunc) // This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream) // Let's do threadsafe output to get predictable test results. outputSetLock := sync.Mutex{} outputSet := util.StringSet{} cfg := &framework.Config{ Queue: fifo, ListerWatcher: source, ObjectType: &api.Pod{}, FullResyncPeriod: time.Millisecond * 100, RetryOnError: false, // Let's implement a simple controller that just deletes // everything that comes in. Process: func(obj interface{}) error { // Obj is from the Pop method of the Queue we make above. newest := obj.(cache.Deltas).Newest() if newest.Type != cache.Deleted { // Update our downstream store. err := downstream.Add(newest.Object) if err != nil { return err } source.Delete(newest.Object.(runtime.Object)) } else { // Update our downstream store. err := downstream.Delete(newest.Object) if err != nil { return err } // fifo's KeyOf is easiest, because it handles // DeletedFinalStateUnknown markers. key, err := fifo.KeyOf(newest.Object) if err != nil { return err } // Record some output. outputSetLock.Lock() defer outputSetLock.Unlock() outputSet.Insert(key) } return nil }, } // Create the controller and run it until we close stop. stop := make(chan struct{}) framework.New(cfg).Run(stop) // Let's add a few objects to the source. for _, name := range []string{"a-hello", "b-controller", "c-framework"} { // Note that these pods are not valid-- the fake source doesn't // call validation or anything. source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}}) } // Let's wait for the controller to process the things we just added. time.Sleep(500 * time.Millisecond) close(stop) outputSetLock.Lock() for _, key := range outputSet.List() { fmt.Println(key) } }
Output: a-hello b-controller c-framework
Index ¶
- type Config
- type Controller
- type FakeControllerSource
- func (f *FakeControllerSource) Add(obj runtime.Object)
- func (f *FakeControllerSource) Delete(lastValue runtime.Object)
- func (f *FakeControllerSource) List() (runtime.Object, error)
- func (f *FakeControllerSource) Modify(obj runtime.Object)
- func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, error)
- type ProcessFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // The queue for your objects; either a cache.FIFO or // a cache.DeltaFIFO. Your Process() function should accept // the output of this Oueue's Pop() method. cache.Queue // Something that can list and watch your objects. cache.ListerWatcher // Something that can process your objects. Process ProcessFunc // The type of your objects. ObjectType runtime.Object // Reprocess everything at least this often. // Note that if it takes longer for you to clear the queue than this // period, you will end up processing items in the order determined // by cache.FIFO.Replace(). Currently, this is random. If this is a // problem, we can change that replacement policy to append new // things to the end of the queue instead of replacing the entire // queue. FullResyncPeriod time.Duration // If true, when Process() returns an error, re-enqueue the object. // TODO: add interface to let you inject a delay/backoff or drop // the object completely if desired. Pass the object in // question to this interface as a parameter. RetryOnError bool }
Config contains all the settings for a Controller.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is a generic controller framework.
func (*Controller) Run ¶
func (c *Controller) Run(stopCh <-chan struct{})
Run begins processing items, and will continue until a value is sent down stopCh. It's an error to call Run more than once. Run does not block.
type FakeControllerSource ¶
type FakeControllerSource struct {
// contains filtered or unexported fields
}
FakeControllerSource implements listing/watching for testing.
func NewFakeControllerSource ¶
func NewFakeControllerSource() *FakeControllerSource
func (*FakeControllerSource) Add ¶
func (f *FakeControllerSource) Add(obj runtime.Object)
Add adds an object to the set and sends an add event to watchers. obj's ResourceVersion is set.
func (*FakeControllerSource) Delete ¶
func (f *FakeControllerSource) Delete(lastValue runtime.Object)
Delete deletes an object from the set and sends a delete event to watchers. obj's ResourceVersion is set.
func (*FakeControllerSource) List ¶
func (f *FakeControllerSource) List() (runtime.Object, error)
List returns a list object, with its resource version set.
func (*FakeControllerSource) Modify ¶
func (f *FakeControllerSource) Modify(obj runtime.Object)
Modify updates an object in the set and sends a modified event to watchers. obj's ResourceVersion is set.
type ProcessFunc ¶
type ProcessFunc func(obj interface{}) error
ProcessFunc processes a single object.