reconciler

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2024 License: Apache-2.0 Imports: 19 Imported by: 7

Documentation

Index

Constants

View Source
const (
	OpUpdate = "update"
	OpDelete = "delete"
)

Variables

This section is empty.

Functions

func NewStatusIndex

func NewStatusIndex[Obj any](getObjectStatus func(Obj) Status) statedb.Index[Obj, StatusKind]

NewStatusIndex creates a status index for a table of reconcilable objects. This is optional and should be only used when there is a need to often check that all objects are fully reconciled that outweighs the cost of maintaining a status index.

func WaitForReconciliation

func WaitForReconciliation[Obj any](ctx context.Context, db *statedb.DB, table statedb.Table[Obj], statusIndex statedb.Index[Obj, StatusKind]) error

WaitForReconciliation blocks until all objects have been reconciled or the context has cancelled.

Types

type BatchEntry

type BatchEntry[Obj any] struct {
	Object   Obj
	Revision statedb.Revision
	Result   error
	// contains filtered or unexported fields
}

type BatchOperations

type BatchOperations[Obj any] interface {
	UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []BatchEntry[Obj])
	DeleteBatch(context.Context, statedb.ReadTxn, []BatchEntry[Obj])
}

type ExpVarMetrics

type ExpVarMetrics struct {
	ReconciliationCountVar         *expvar.Map
	ReconciliationDurationVar      *expvar.Map
	ReconciliationTotalErrorsVar   *expvar.Map
	ReconciliationCurrentErrorsVar *expvar.Map

	PruneCountVar         *expvar.Map
	PruneDurationVar      *expvar.Map
	PruneTotalErrorsVar   *expvar.Map
	PruneCurrentErrorsVar *expvar.Map
}

func NewExpVarMetrics

func NewExpVarMetrics() *ExpVarMetrics

func NewUnpublishedExpVarMetrics

func NewUnpublishedExpVarMetrics() *ExpVarMetrics

func (*ExpVarMetrics) PruneDuration added in v0.2.0

func (m *ExpVarMetrics) PruneDuration(moduleID cell.FullModuleID, duration time.Duration)

func (*ExpVarMetrics) PruneError added in v0.2.0

func (m *ExpVarMetrics) PruneError(moduleID cell.FullModuleID, err error)

func (*ExpVarMetrics) ReconciliationDuration added in v0.2.0

func (m *ExpVarMetrics) ReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)

func (*ExpVarMetrics) ReconciliationErrors added in v0.2.0

func (m *ExpVarMetrics) ReconciliationErrors(moduleID cell.FullModuleID, new, current int)

type Metrics

type Metrics interface {
	ReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
	ReconciliationErrors(moduleID cell.FullModuleID, new, current int)

	PruneError(moduleID cell.FullModuleID, err error)
	PruneDuration(moduleID cell.FullModuleID, duration time.Duration)
}

type Operations

type Operations[Obj any] interface {
	// Update the object in the target. If the operation is long-running it should
	// abort if context is cancelled. Should return an error if the operation fails.
	// The reconciler will retry the operation again at a later time, potentially
	// with a new version of the object. The operation should thus be idempotent.
	//
	// Update is used both for incremental and full reconciliation. Incremental
	// reconciliation is performed when the desired state is updated. A full
	// reconciliation is done periodically by calling 'Update' on all objects.
	//
	// The object handed to Update is a clone produced by Config.CloneObject
	// and thus Update can mutate the object. The mutations are only guaranteed
	// to be retained if the object has a single reconciler (one Status).
	Update(ctx context.Context, txn statedb.ReadTxn, obj Obj) error

	// Delete the object in the target. Same semantics as with Update.
	// Deleting a non-existing object is not an error and returns nil.
	Delete(context.Context, statedb.ReadTxn, Obj) error

	// Prune undesired state. It is given an iterator for the full set of
	// desired objects. The implementation should diff the desired state against
	// the realized state to find things to prune.
	// Invoked during full reconciliation before the individual objects are Update()'d.
	//
	// Unlike failed Update()'s a failed Prune() operation is not retried until
	// the next full reconciliation round.
	Prune(context.Context, statedb.ReadTxn, statedb.Iterator[Obj]) error
}

Operations defines how to reconcile an object.

Each operation is given a context that limits the lifetime of the operation and a ReadTxn to allow looking up referenced state.

type Option added in v0.2.0

type Option func(opts *options)

Option for the reconciler

func WithMetrics added in v0.2.0

func WithMetrics(m Metrics) Option

WithMetrics sets the Metrics instance to use with this reconciler. The metrics capture the duration of operations during incremental and full reconcilation and the errors that occur during either.

If this option is not used, then the default metrics instance is used.

func WithPruning added in v0.2.0

func WithPruning(interval time.Duration) Option

WithPruning enables periodic pruning (calls to Prune() operation) [interval] is the interval at which Prune() is called to prune unexpected objects in the target system. Prune() will not be called before the table has been fully initialized (Initialized() returns true). A single Prune() can be forced via the [Reconciler.Prune] method regardless if pruning has been enabled.

Pruning is enabled by default. See [config.go] for the default interval.

func WithRefreshing added in v0.2.0

func WithRefreshing(interval time.Duration, limiter *rate.Limiter) Option

WithRefreshing enables periodic refreshes of objects. [interval] is the interval at which the objects are refreshed, e.g. how often Update() should be called to refresh an object even when it has not changed. This is implemented by periodically setting all objects that have not been updated for [RefreshInterval] or longer as pending. [limiter] is the rate-limiter for controlling the rate at which the objects are marked pending.

Refreshing is disabled by default.

func WithRetry added in v0.2.0

func WithRetry(minBackoff, maxBackoff time.Duration) Option

WithRetry sets the minimum and maximum amount of time to wait before retrying a failed Update() or Delete() operation on an object. The retry wait time for an object will increase exponentially on subsequent failures until [maxBackoff] is reached.

func WithRoundLimits added in v0.2.0

func WithRoundLimits(numObjects int, limiter *rate.Limiter) Option

WithRoundLimits sets the reconciliation round size and rate limit. [numObjects] limits how many objects are reconciled per round before updating their status. A high number will delay status updates and increase latency for those watching the object reconciliation status. A low value increases the overhead of the status committing and reduces effectiveness of the batch operations (smaller batch sizes). [limiter] is used to limit the number of rounds per second to allow a larger batch to build up and to avoid reconciliation of intermediate object states.

func WithoutPruning added in v0.2.0

func WithoutPruning() Option

WithoutPruning disabled periodic pruning.

type Params

type Params struct {
	cell.In

	Lifecycle      cell.Lifecycle
	Log            *slog.Logger
	DB             *statedb.DB
	Jobs           job.Registry
	ModuleID       cell.FullModuleID
	Health         cell.Health
	DefaultMetrics Metrics `optional:"true"`
}

Params are the reconciler dependencies that are independent of the use-case.

type Reconciler

type Reconciler[Obj any] interface {
	// Prune triggers an immediate pruning regardless of [PruneInterval].
	// Implemented as a select+send to a channel of size 1, so N concurrent
	// calls of this method may result in less than N full reconciliations.
	// This still requires the table to be fully initialized to have an effect.
	//
	// Primarily useful in tests, but may be of use when there's knowledge
	// that something has gone wrong in the reconciliation target and full
	// reconciliation is needed to recover.
	Prune()
}

func Register

func Register[Obj comparable](

	params Params,

	table statedb.RWTable[Obj],

	clone func(Obj) Obj,

	setStatus func(Obj, Status) Obj,

	getStatus func(Obj) Status,

	ops Operations[Obj],

	batchOps BatchOperations[Obj],

	options ...Option,
) (Reconciler[Obj], error)

Register creates a new reconciler and registers it to the application lifecycle.

The setStatus etc. functions are passed in as arguments rather than requiring the object to implement them via interface as this allows constructing multiple reconcilers for a single object by having multiple status fields and different functions for manipulating them.

type Status

type Status struct {
	Kind      StatusKind
	UpdatedAt time.Time
	Error     string
	// contains filtered or unexported fields
}

Status is embedded into the reconcilable object. It allows inspecting per-object reconciliation status and waiting for the reconciler. Object may have multiple reconcilers and multiple reconciliation statuses.

func StatusDone

func StatusDone() Status

StatusDone constructs the status that marks the object as reconciled.

func StatusError

func StatusError(err error) Status

statusError constructs the status that marks the object as failed to be reconciled.

func StatusPending

func StatusPending() Status

StatusPending constructs the status for marking the object as requiring reconciliation. The reconciler will perform the Update operation and on success transition to Done status, or on failure to Error status.

func StatusRefreshing added in v0.2.0

func StatusRefreshing() Status

StatusRefreshing constructs the status for marking the object as requiring refreshing. The reconciler will perform the Update operation and on success transition to Done status, or on failure to Error status.

This is distinct from the Pending status in order to give a hint to the Update operation that this is a refresh of the object and should be forced.

func (Status) IsPendingOrRefreshing added in v0.2.0

func (s Status) IsPendingOrRefreshing() bool

func (Status) String

func (s Status) String() string

type StatusKind

type StatusKind string
const (
	StatusKindPending    StatusKind = "Pending"
	StatusKindRefreshing StatusKind = "Refreshing"
	StatusKindDone       StatusKind = "Done"
	StatusKindError      StatusKind = "Error"
)

func (StatusKind) Key

func (s StatusKind) Key() index.Key

Key implements an optimized construction of index.Key for StatusKind to avoid copying and allocation.

type StatusSet added in v0.2.0

type StatusSet struct {
	// contains filtered or unexported fields
}

StatusSet is a set of named statuses. This allows for the use of multiple reconcilers per object when the reconcilers are not known up front.

func NewStatusSet added in v0.2.0

func NewStatusSet() StatusSet

func (StatusSet) All added in v0.2.0

func (s StatusSet) All() map[string]Status

func (StatusSet) Get added in v0.2.0

func (s StatusSet) Get(name string) Status

Get returns the status for the named reconciler. Use this to implement 'GetObjectStatus' for your reconciler. If this reconciler is new the status is pending.

func (StatusSet) MarshalJSON added in v0.2.0

func (s StatusSet) MarshalJSON() ([]byte, error)

MarshalJSON marshals the StatusSet as a map[string]Status. It carries enough information over to be able to implement String() so this can be used to implement the TableRow() method.

func (StatusSet) Pending added in v0.2.0

func (s StatusSet) Pending() StatusSet

Pending returns a new pending status set. The names of reconcilers are reused to be able to show which are still pending.

func (StatusSet) Set added in v0.2.0

func (s StatusSet) Set(name string, status Status) StatusSet

Set the reconcilation status of the named reconciler. Use this to implement 'SetObjectStatus' for your reconciler.

func (StatusSet) String added in v0.2.0

func (s StatusSet) String() string

func (*StatusSet) UnmarshalJSON added in v0.2.0

func (s *StatusSet) UnmarshalJSON(data []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL