Documentation ¶
Index ¶
- Constants
- func NewStatusIndex[Obj any](getObjectStatus func(Obj) Status) statedb.Index[Obj, StatusKind]
- func WaitForReconciliation[Obj any](ctx context.Context, db *statedb.DB, table statedb.Table[Obj], ...) error
- type BatchEntry
- type BatchOperations
- type ExpVarMetrics
- func (m *ExpVarMetrics) PruneDuration(moduleID cell.FullModuleID, duration time.Duration)
- func (m *ExpVarMetrics) PruneError(moduleID cell.FullModuleID, err error)
- func (m *ExpVarMetrics) ReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
- func (m *ExpVarMetrics) ReconciliationErrors(moduleID cell.FullModuleID, new, current int)
- type Metrics
- type Operations
- type Option
- func WithMetrics(m Metrics) Option
- func WithPruning(interval time.Duration) Option
- func WithRefreshing(interval time.Duration, limiter *rate.Limiter) Option
- func WithRetry(minBackoff, maxBackoff time.Duration) Option
- func WithRoundLimits(numObjects int, limiter *rate.Limiter) Option
- func WithoutPruning() Option
- type Params
- type Reconciler
- type Status
- type StatusKind
- type StatusSet
- func (s StatusSet) All() map[string]Status
- func (s StatusSet) Get(name string) Status
- func (s StatusSet) MarshalJSON() ([]byte, error)
- func (s StatusSet) Pending() StatusSet
- func (s StatusSet) Set(name string, status Status) StatusSet
- func (s StatusSet) String() string
- func (s *StatusSet) UnmarshalJSON(data []byte) error
Constants ¶
const ( OpUpdate = "update" OpDelete = "delete" )
Variables ¶
This section is empty.
Functions ¶
func NewStatusIndex ¶
func NewStatusIndex[Obj any](getObjectStatus func(Obj) Status) statedb.Index[Obj, StatusKind]
NewStatusIndex creates a status index for a table of reconcilable objects. This is optional and should be only used when there is a need to often check that all objects are fully reconciled that outweighs the cost of maintaining a status index.
Types ¶
type BatchEntry ¶
type BatchOperations ¶
type ExpVarMetrics ¶
type ExpVarMetrics struct { ReconciliationCountVar *expvar.Map ReconciliationDurationVar *expvar.Map ReconciliationTotalErrorsVar *expvar.Map ReconciliationCurrentErrorsVar *expvar.Map PruneCountVar *expvar.Map PruneDurationVar *expvar.Map PruneTotalErrorsVar *expvar.Map PruneCurrentErrorsVar *expvar.Map }
func NewExpVarMetrics ¶
func NewExpVarMetrics() *ExpVarMetrics
func NewUnpublishedExpVarMetrics ¶
func NewUnpublishedExpVarMetrics() *ExpVarMetrics
func (*ExpVarMetrics) PruneDuration ¶ added in v0.2.0
func (m *ExpVarMetrics) PruneDuration(moduleID cell.FullModuleID, duration time.Duration)
func (*ExpVarMetrics) PruneError ¶ added in v0.2.0
func (m *ExpVarMetrics) PruneError(moduleID cell.FullModuleID, err error)
func (*ExpVarMetrics) ReconciliationDuration ¶ added in v0.2.0
func (m *ExpVarMetrics) ReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
func (*ExpVarMetrics) ReconciliationErrors ¶ added in v0.2.0
func (m *ExpVarMetrics) ReconciliationErrors(moduleID cell.FullModuleID, new, current int)
type Metrics ¶
type Metrics interface { ReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration) ReconciliationErrors(moduleID cell.FullModuleID, new, current int) PruneError(moduleID cell.FullModuleID, err error) PruneDuration(moduleID cell.FullModuleID, duration time.Duration) }
type Operations ¶
type Operations[Obj any] interface { // Update the object in the target. If the operation is long-running it should // abort if context is cancelled. Should return an error if the operation fails. // The reconciler will retry the operation again at a later time, potentially // with a new version of the object. The operation should thus be idempotent. // // Update is used both for incremental and full reconciliation. Incremental // reconciliation is performed when the desired state is updated. A full // reconciliation is done periodically by calling 'Update' on all objects. // // The object handed to Update is a clone produced by Config.CloneObject // and thus Update can mutate the object. The mutations are only guaranteed // to be retained if the object has a single reconciler (one Status). Update(ctx context.Context, txn statedb.ReadTxn, obj Obj) error // Delete the object in the target. Same semantics as with Update. // Deleting a non-existing object is not an error and returns nil. Delete(context.Context, statedb.ReadTxn, Obj) error // Prune undesired state. It is given an iterator for the full set of // desired objects. The implementation should diff the desired state against // the realized state to find things to prune. // Invoked during full reconciliation before the individual objects are Update()'d. // // Unlike failed Update()'s a failed Prune() operation is not retried until // the next full reconciliation round. Prune(context.Context, statedb.ReadTxn, statedb.Iterator[Obj]) error }
Operations defines how to reconcile an object.
Each operation is given a context that limits the lifetime of the operation and a ReadTxn to allow looking up referenced state.
type Option ¶ added in v0.2.0
type Option func(opts *options)
Option for the reconciler
func WithMetrics ¶ added in v0.2.0
WithMetrics sets the Metrics instance to use with this reconciler. The metrics capture the duration of operations during incremental and full reconcilation and the errors that occur during either.
If this option is not used, then the default metrics instance is used.
func WithPruning ¶ added in v0.2.0
WithPruning enables periodic pruning (calls to Prune() operation) [interval] is the interval at which Prune() is called to prune unexpected objects in the target system. Prune() will not be called before the table has been fully initialized (Initialized() returns true). A single Prune() can be forced via the [Reconciler.Prune] method regardless if pruning has been enabled.
Pruning is enabled by default. See [config.go] for the default interval.
func WithRefreshing ¶ added in v0.2.0
WithRefreshing enables periodic refreshes of objects. [interval] is the interval at which the objects are refreshed, e.g. how often Update() should be called to refresh an object even when it has not changed. This is implemented by periodically setting all objects that have not been updated for [RefreshInterval] or longer as pending. [limiter] is the rate-limiter for controlling the rate at which the objects are marked pending.
Refreshing is disabled by default.
func WithRetry ¶ added in v0.2.0
WithRetry sets the minimum and maximum amount of time to wait before retrying a failed Update() or Delete() operation on an object. The retry wait time for an object will increase exponentially on subsequent failures until [maxBackoff] is reached.
func WithRoundLimits ¶ added in v0.2.0
WithRoundLimits sets the reconciliation round size and rate limit. [numObjects] limits how many objects are reconciled per round before updating their status. A high number will delay status updates and increase latency for those watching the object reconciliation status. A low value increases the overhead of the status committing and reduces effectiveness of the batch operations (smaller batch sizes). [limiter] is used to limit the number of rounds per second to allow a larger batch to build up and to avoid reconciliation of intermediate object states.
func WithoutPruning ¶ added in v0.2.0
func WithoutPruning() Option
WithoutPruning disabled periodic pruning.
type Params ¶
type Params struct { cell.In Lifecycle cell.Lifecycle Log *slog.Logger DB *statedb.DB Jobs job.Registry ModuleID cell.FullModuleID Health cell.Health DefaultMetrics Metrics `optional:"true"` }
Params are the reconciler dependencies that are independent of the use-case.
type Reconciler ¶
type Reconciler[Obj any] interface { // Prune triggers an immediate pruning regardless of [PruneInterval]. // Implemented as a select+send to a channel of size 1, so N concurrent // calls of this method may result in less than N full reconciliations. // This still requires the table to be fully initialized to have an effect. // // Primarily useful in tests, but may be of use when there's knowledge // that something has gone wrong in the reconciliation target and full // reconciliation is needed to recover. Prune() }
func Register ¶
func Register[Obj comparable]( params Params, table statedb.RWTable[Obj], clone func(Obj) Obj, setStatus func(Obj, Status) Obj, getStatus func(Obj) Status, ops Operations[Obj], batchOps BatchOperations[Obj], options ...Option, ) (Reconciler[Obj], error)
Register creates a new reconciler and registers it to the application lifecycle.
The setStatus etc. functions are passed in as arguments rather than requiring the object to implement them via interface as this allows constructing multiple reconcilers for a single object by having multiple status fields and different functions for manipulating them.
type Status ¶
type Status struct { Kind StatusKind UpdatedAt time.Time Error string // contains filtered or unexported fields }
Status is embedded into the reconcilable object. It allows inspecting per-object reconciliation status and waiting for the reconciler. Object may have multiple reconcilers and multiple reconciliation statuses.
func StatusDone ¶
func StatusDone() Status
StatusDone constructs the status that marks the object as reconciled.
func StatusError ¶
statusError constructs the status that marks the object as failed to be reconciled.
func StatusPending ¶
func StatusPending() Status
StatusPending constructs the status for marking the object as requiring reconciliation. The reconciler will perform the Update operation and on success transition to Done status, or on failure to Error status.
func StatusRefreshing ¶ added in v0.2.0
func StatusRefreshing() Status
StatusRefreshing constructs the status for marking the object as requiring refreshing. The reconciler will perform the Update operation and on success transition to Done status, or on failure to Error status.
This is distinct from the Pending status in order to give a hint to the Update operation that this is a refresh of the object and should be forced.
func (Status) IsPendingOrRefreshing ¶ added in v0.2.0
type StatusKind ¶
type StatusKind string
const ( StatusKindPending StatusKind = "Pending" StatusKindRefreshing StatusKind = "Refreshing" StatusKindDone StatusKind = "Done" StatusKindError StatusKind = "Error" )
func (StatusKind) Key ¶
func (s StatusKind) Key() index.Key
Key implements an optimized construction of index.Key for StatusKind to avoid copying and allocation.
type StatusSet ¶ added in v0.2.0
type StatusSet struct {
// contains filtered or unexported fields
}
StatusSet is a set of named statuses. This allows for the use of multiple reconcilers per object when the reconcilers are not known up front.
func NewStatusSet ¶ added in v0.2.0
func NewStatusSet() StatusSet
func (StatusSet) Get ¶ added in v0.2.0
Get returns the status for the named reconciler. Use this to implement 'GetObjectStatus' for your reconciler. If this reconciler is new the status is pending.
func (StatusSet) MarshalJSON ¶ added in v0.2.0
MarshalJSON marshals the StatusSet as a map[string]Status. It carries enough information over to be able to implement String() so this can be used to implement the TableRow() method.
func (StatusSet) Pending ¶ added in v0.2.0
Pending returns a new pending status set. The names of reconcilers are reused to be able to show which are still pending.
func (StatusSet) Set ¶ added in v0.2.0
Set the reconcilation status of the named reconciler. Use this to implement 'SetObjectStatus' for your reconciler.