Documentation ¶
Index ¶
- Variables
- type Config
- type Controller
- type Error
- type EventHandler
- type EventHandlerFunc
- type EventStream
- type EventStreamFunc
- type Logger
- type NoopLogger
- func (n NoopLogger) Debug(msg string, keyValues ...interface{})
- func (n NoopLogger) Error(msg string, keyValues ...interface{})
- func (n NoopLogger) Info(msg string, keyValues ...interface{})
- func (n NoopLogger) Warn(msg string, keyValues ...interface{})
- func (n NoopLogger) With(keyValues ...interface{}) Logger
- type Observability
- type Reconciler
- type ReconcilerFunc
- type Result
- type WorkerHasher
- type WorkerHasherFunc
Constants ¶
This section is empty.
Variables ¶
var DefaultHasher = WorkerHasherFunc(func(_ context.Context, id string, count int) (int, error) { if count == 1 { return 0, nil } algorithm := fnv.New32a() algorithm.Write([]byte(id)) return int(algorithm.Sum32() % uint32(count)), nil })
DefaultHasher a WorkerHasher which hashes the id and return `hash % count`.
var NoopStream = EventStreamFunc(func(ctx context.Context, handler EventHandler) error { <-ctx.Done() return nil })
NoopStream a stream that does nothing
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // MaxItemRetries the number of times an item gets retried before dropping it MaxItemRetries int // WorkerQueueSize the size of the worker queue (outstanding reconciles) WorkerQueueSize int // WorkerHasher the function to assign work between workers WorkerHasher WorkerHasher // WorkerCount the number of workers WorkerCount int // LeaderElectionEnabled whether or not we should use LeaderElectionEnabled bool // DelayResolution the lowest possible time for a delay retry DelayResolution time.Duration // DelayQueueSize the maximum number of items in the scheduled delay queue DelayQueueSize int // MaxReconcileTime the maximum time a handle of an item should take MaxReconcileTime time.Duration // Observability configuration for logs, metrics and traces Observability Observability }
Config use to configure a controller.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig a good set of configuration to get started
type Controller ¶
type Controller interface { // Run execute the control-loop until the context is cancelled Run(ctx context.Context) error // BecomeLeader notify that this controller is now leader and that it should start the control-loop BecomeLeader() }
Controller the core interface to define a control-loop
func New ¶
func New(config Config, reconciler Reconciler, streams map[string]EventStream) Controller
New create a new controller
type Error ¶
type Error interface { error // RetryDelay how long to wait before adding back in the queue RetryDelay() time.Duration }
Error an error that has a custom retry delay.
type EventHandler ¶
EventHandler called whenever an event is triggered
func MeteredEventHandler ¶
func MeteredEventHandler(meter metric.Meter, name string, child EventHandler) (EventHandler, error)
MeteredEventHandler adds metrics any event reconciler
type EventHandlerFunc ¶
EventHandlerFunc see EventHandler
type EventStream ¶
type EventStream interface {
Subscribe(ctx context.Context, handler EventHandler) error
}
EventStream calls `reconciler` whenever a new event is triggered. Examples of EventStreams are: "KafkaConsumers", "PubSub systems", "Nomad event stream". It's usually a way to signal that an external change happened and that we should rerun the control loop for the element with a given id.
func ResyncLoopEventStream ¶
func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn func(ctx context.Context) ([]string, error)) (EventStream, error)
ResyncLoopEventStream an EventStream that calls `listFn` every `duration` interval. This is used for rerunning the control-loop for all entities periodically. Having one of these is recommended for any controller.
type EventStreamFunc ¶
type EventStreamFunc func(ctx context.Context, handler EventHandler) error
EventStreamFunc see EventStream
func (EventStreamFunc) Subscribe ¶
func (f EventStreamFunc) Subscribe(ctx context.Context, handler EventHandler) error
Subscribe calls f(ctx, handler)
type Logger ¶
type Logger interface { // With create a new logger with fixed keys With(keyValues ...interface{}) Logger // Debug log at debug level Debug(msg string, keyValues ...interface{}) // Info log at debug level Info(msg string, keyValues ...interface{}) // Warn log at debug level Warn(msg string, keyValues ...interface{}) // Error log at debug level Error(msg string, keyValues ...interface{}) }
Logger a wrapper for your logger implementation
type NoopLogger ¶
type NoopLogger struct{}
NoopLogger a logger that does nothing
func (NoopLogger) Debug ¶
func (n NoopLogger) Debug(msg string, keyValues ...interface{})
Debug noop
func (NoopLogger) Error ¶
func (n NoopLogger) Error(msg string, keyValues ...interface{})
Error noop
type Observability ¶
Observability holds everything needed for instrumenting the reconciler code
func DefaultObservability ¶
func DefaultObservability() Observability
DefaultObservability uses noopLogger and otel.GetMeter and otel.GetTracer
func NewObservability ¶
func NewObservability(l Logger, m metric.MeterProvider, t trace.TracerProvider) Observability
NewObservability create a new observability wraooer (usually easier to use DefaultObservability)
func (Observability) LoggerWithCtx ¶
func (o Observability) LoggerWithCtx(ctx context.Context) Logger
LoggerWithCtx add the tracing context to the logger
type Reconciler ¶
type Reconciler interface { // Apply handle the item and potentially return an error Apply(ctx context.Context, id string) Result }
Reconciler is the core implementation of the control-loop.
type ReconcilerFunc ¶
ReconcilerFunc see Reconciler
type Result ¶
type Result struct { // RequeueDelay the time to wait before requeing, ignored is Error is not nil RequeueDelay time.Duration // Error the error Error error }
Result a wrapper that is returned by a Reconciler.
type WorkerHasher ¶
type WorkerHasher interface { // Route decide on which worker this item will go (return a value < 0 to drop this item), count is the number of items Route(ctx context.Context, id string, count int) (int, error) }
WorkerHasher specifies which of the control-loop workers should handle this specific item.