reconciler

package
v1.16.0-pre.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LabelModuleId  = "module_id"
	LabelOperation = "op"

	OpUpdate = "update"
	OpDelete = "delete"
	OpPrune  = "prune"
)

Variables

View Source
var Cell = cell.Module(
	"reconciler",
	"Shared metrics for the reconcilers",

	metrics.Metric(newMetrics),
)

Cell provides shared objects used by all reconciler instances. Currently it provides only the Metrics object.

Functions

func NewStatusIndex

func NewStatusIndex[Obj any](getObjectStatus func(Obj) Status) statedb.Index[Obj, StatusKind]

NewStatusIndex creates a status index for a table of reconcilable objects.

func Register

func Register[Obj comparable](p Params[Obj]) error

Register creates a new reconciler and registers to the application lifecycle. To be used with cell.Invoke when the API of the reconciler is not needed.

func WaitForReconciliation

func WaitForReconciliation[Obj any](ctx context.Context, db *statedb.DB, table statedb.Table[Obj], statusIndex statedb.Index[Obj, StatusKind]) error

WaitForReconciliation blocks until all objects have been reconciled or the context has cancelled.

Types

type BatchEntry

type BatchEntry[Obj any] struct {
	Object   Obj
	Revision statedb.Revision
	Result   error
}

type BatchOperations

type BatchOperations[Obj any] interface {
	UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []BatchEntry[Obj])
	DeleteBatch(context.Context, statedb.ReadTxn, []BatchEntry[Obj])
}

type Config

type Config[Obj any] struct {
	// FullReconcilationInterval is the amount of time to wait between full
	// reconciliation rounds. A full reconciliation is Prune() of unexpected
	// objects and Update() of all objects. With full reconciliation we're
	// resilient towards outside changes. If FullReconcilationInterval is
	// 0 then full reconciliation is disabled.
	FullReconcilationInterval time.Duration

	// RetryBackoffMinDuration is the minimum amount of time to wait before
	// retrying a failed Update() or Delete() operation on an object.
	// The retry wait time for an object will increase exponentially on
	// subsequent failures until RetryBackoffMaxDuration is reached.
	RetryBackoffMinDuration time.Duration

	// RetryBackoffMaxDuration is the maximum amount of time to wait before
	// retrying.
	RetryBackoffMaxDuration time.Duration

	// IncrementalRoundSize is the maximum number objects to reconcile during
	// incremental reconciliation before updating status and refreshing the
	// statedb snapshot. This should be tuned based on the cost of each operation
	// and the rate of expected changes so that health and per-object status
	// updates are not delayed too much. If in doubt, use a value between 100-1000.
	IncrementalRoundSize int

	// GetObjectStatus returns the reconciliation status for the object.
	GetObjectStatus func(Obj) Status

	// WithObjectStatus returns a COPY of the object with the status set to
	// the given value.
	WithObjectStatus func(Obj, Status) Obj

	// RateLimiter is optional and if set will use the limiter to wait between
	// reconciliation rounds. This allows trading latency with throughput by
	// waiting longer to collect a batch of objects to reconcile.
	RateLimiter *rate.Limiter

	// Operations defines how an object is reconciled.
	Operations Operations[Obj]

	// BatchOperations is optional and if provided these are used instead of normal operations.
	BatchOperations BatchOperations[Obj]
}

type Metrics

type Metrics struct {
	IncrementalReconciliationCount         metric.Vec[metric.Counter]
	IncrementalReconciliationDuration      metric.Vec[metric.Observer]
	IncrementalReconciliationTotalErrors   metric.Vec[metric.Counter]
	IncrementalReconciliationCurrentErrors metric.Vec[metric.Gauge]

	FullReconciliationCount          metric.Vec[metric.Counter]
	FullReconciliationOutOfSyncCount metric.Vec[metric.Counter]
	FullReconciliationTotalErrors    metric.Vec[metric.Counter]
	FullReconciliationDuration       metric.Vec[metric.Observer]
}

type Operations

type Operations[Obj any] interface {
	// Update the object in the target. If the operation is long-running it should
	// abort if context is cancelled. Should return an error if the operation fails.
	// The reconciler will retry the operation again at a later time, potentially
	// with a new version of the object. The operation should thus be idempotent.
	//
	// Update is used both for incremental and full reconciliation. Incremental
	// reconciliation is performed when the desired state is updated. A full
	// reconciliation is done periodically by calling 'Update' on all objects.
	//
	// If 'changed' is non-nil then the Update must compare the realized state
	// with the desired state and set it to true if they differ, e.g. whether
	// the operation resulted in a change to the realized state. This is used
	// during full reconciliation to catch cases where the realized state has
	// gone out of sync due to outside influence. This is tracked in the
	// "full_out_of_sync_total" metric.
	Update(ctx context.Context, txn statedb.ReadTxn, obj Obj, changed *bool) error

	// Delete the object in the target. Same semantics as with Update.
	// Deleting a non-existing object is not an error and returns nil.
	Delete(context.Context, statedb.ReadTxn, Obj) error

	// Prune undesired state. It is given an iterator for the full set of
	// desired objects. The implementation should diff the desired state against
	// the realized state to find things to prune.
	// Invoked during full reconciliation before the individual objects are Update()'d.
	//
	// Unlike failed Update()'s a failed Prune() operation is not retried until
	// the next full reconciliation round.
	Prune(context.Context, statedb.ReadTxn, statedb.Iterator[Obj]) error
}

Operations defines how to reconcile an object.

Each operation is given a context that limits the lifetime of the operation and a ReadTxn to allow for the option of looking up realized state from another statedb table as an optimization (main use-case is reconciling routes against Table[Route] to avoid a syscall per route).

type Params

type Params[Obj comparable] struct {
	cell.In

	Config    Config[Obj]
	Lifecycle cell.Lifecycle
	DB        *statedb.DB
	Table     statedb.RWTable[Obj]
	Jobs      job.Registry
	Metrics   *Metrics
	ModuleId  cell.ModuleID
	Health    cell.Health
}

type Reconciler

type Reconciler[Obj any] interface {
	// TriggerFullReconciliation triggers an immediate full reconciliation,
	// e.g. Prune() of unknown objects and Update() of all objects.
	// Implemented as a select+send to a channel of size 1, so N concurrent
	// calls of this method may result in less than N full reconciliations.
	//
	// Primarily useful in tests, but may be of use when there's knowledge
	// that something has gone wrong in the reconciliation target and full
	// reconciliation is needed to recover.
	TriggerFullReconciliation()
}

func New

func New[Obj comparable](p Params[Obj]) (Reconciler[Obj], error)

New creates and registers a new reconciler.

type Status

type Status struct {
	Kind StatusKind

	// Delete is true if the object should be deleted by the reconciler.
	// If an object is deleted outside the reconciler it will not be
	// processed by the incremental reconciliation.
	// We use soft deletes in order to observe and wait for deletions.
	Delete bool

	UpdatedAt time.Time
	Error     string
}

Status is embedded into the reconcilable object. It allows inspecting per-object reconciliation status and waiting for the reconciler.

func StatusDone

func StatusDone() Status

StatusDone constructs the status that marks the object as reconciled.

func StatusError

func StatusError(delete bool, err error) Status

StatusError constructs the status that marks the object as failed to be reconciled.

func StatusPending

func StatusPending() Status

StatusPending constructs the status for marking the object as requiring reconciliation. The reconciler will perform the Update operation and on success transition to Done status, or on failure to Error status.

func StatusPendingDelete

func StatusPendingDelete() Status

StatusPendingDelete constructs the status for marking the object to be deleted.

The reconciler uses soft-deletes in order to be able to retry and to report failed deletions of objects. When the delete operation is successfully performed the reconciler will delete the object from the table.

func (Status) String

func (s Status) String() string

type StatusKind

type StatusKind string
const (
	StatusKindPending StatusKind = "pending"
	StatusKindDone    StatusKind = "done"
	StatusKindError   StatusKind = "error"
)

func (StatusKind) Key

func (s StatusKind) Key() index.Key

Key implements an optimized construction of index.Key for StatusKind to avoid copying and allocation.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL