reconciliation

package
v0.0.0-...-54d0854 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Created metricStartTime = iota
	PickedUp
)

Variables

This section is empty.

Functions

func IsAlreadyInStateError

func IsAlreadyInStateError(err error) bool

func IsDuplicateClusterReconciliationError

func IsDuplicateClusterReconciliationError(err error) bool

Types

type CurrentlyReconciling

type CurrentlyReconciling struct {
}

func (*CurrentlyReconciling) FilterByInstance

func (*CurrentlyReconciling) FilterByQuery

func (cr *CurrentlyReconciling) FilterByQuery(q *db.Select) error

type CurrentlyReconcilingWithRuntimeID

type CurrentlyReconcilingWithRuntimeID struct {
	RuntimeID string
}

func (*CurrentlyReconcilingWithRuntimeID) FilterByInstance

func (*CurrentlyReconcilingWithRuntimeID) FilterByQuery

func (cr *CurrentlyReconcilingWithRuntimeID) FilterByQuery(q *db.Select) error

type DuplicateClusterReconciliationError

type DuplicateClusterReconciliationError struct {
	// contains filtered or unexported fields
}

func (*DuplicateClusterReconciliationError) Error

type Filter

type Filter interface {
	FilterByQuery(q *db.Select) error
	FilterByInstance(i *model.ReconciliationEntity) *model.ReconciliationEntity //return nil to ignore instance in result
}

type FilterMixer

type FilterMixer struct {
	Filters []Filter
}

func (*FilterMixer) FilterByInstance

func (*FilterMixer) FilterByQuery

func (fm *FilterMixer) FilterByQuery(q *db.Select) error

type InMemoryReconciliationRepository

type InMemoryReconciliationRepository struct {
	// contains filtered or unexported fields
}

func (*InMemoryReconciliationRepository) CreateReconciliation

func (*InMemoryReconciliationRepository) EnableDebugLogging

func (r *InMemoryReconciliationRepository) EnableDebugLogging(schedulingID string, correlationID ...string) error

func (*InMemoryReconciliationRepository) FinishReconciliation

func (r *InMemoryReconciliationRepository) FinishReconciliation(schedulingID string, status *model.ClusterStatusEntity) error

func (*InMemoryReconciliationRepository) GetAllComponents

func (r *InMemoryReconciliationRepository) GetAllComponents() ([]string, error)

func (*InMemoryReconciliationRepository) GetComponentOperationProcessingDuration

func (r *InMemoryReconciliationRepository) GetComponentOperationProcessingDuration(component string, state model.OperationState) (int64, error)

func (*InMemoryReconciliationRepository) GetMothershipOperationProcessingDuration

func (r *InMemoryReconciliationRepository) GetMothershipOperationProcessingDuration(component string, state model.OperationState, startTime metricStartTime) (int64, error)

func (*InMemoryReconciliationRepository) GetOperation

func (r *InMemoryReconciliationRepository) GetOperation(schedulingID, correlationID string) (*model.OperationEntity, error)

func (*InMemoryReconciliationRepository) GetOperations

func (*InMemoryReconciliationRepository) GetProcessableOperations

func (r *InMemoryReconciliationRepository) GetProcessableOperations(maxParallelOpsPerRecon int) ([]*model.OperationEntity, error)

func (*InMemoryReconciliationRepository) GetReconciliation

func (r *InMemoryReconciliationRepository) GetReconciliation(schedulingID string) (*model.ReconciliationEntity, error)

func (*InMemoryReconciliationRepository) GetReconciliations

func (r *InMemoryReconciliationRepository) GetReconciliations(filter Filter) ([]*model.ReconciliationEntity, error)

func (*InMemoryReconciliationRepository) GetReconcilingOperations

func (r *InMemoryReconciliationRepository) GetReconcilingOperations() ([]*model.OperationEntity, error)

func (*InMemoryReconciliationRepository) GetRuntimeIDs

func (r *InMemoryReconciliationRepository) GetRuntimeIDs() ([]string, error)

func (*InMemoryReconciliationRepository) RemoveReconciliationByRuntimeID

func (r *InMemoryReconciliationRepository) RemoveReconciliationByRuntimeID(runtimeID string) error

func (*InMemoryReconciliationRepository) RemoveReconciliationBySchedulingID

func (r *InMemoryReconciliationRepository) RemoveReconciliationBySchedulingID(schedulingID string) error

func (*InMemoryReconciliationRepository) RemoveReconciliationsBeforeDeadline

func (r *InMemoryReconciliationRepository) RemoveReconciliationsBeforeDeadline(runtimeID string, latestSchedulingID string, deadline time.Time) error

func (*InMemoryReconciliationRepository) RemoveReconciliationsBySchedulingID

func (r *InMemoryReconciliationRepository) RemoveReconciliationsBySchedulingID(schedulingIDs []interface{}) error

func (*InMemoryReconciliationRepository) UpdateComponentOperationProcessingDuration

func (r *InMemoryReconciliationRepository) UpdateComponentOperationProcessingDuration(schedulingID, correlationID string, processingDuration int) error

func (*InMemoryReconciliationRepository) UpdateOperationPickedUp

func (r *InMemoryReconciliationRepository) UpdateOperationPickedUp(schedulingID, correlationID string) error

func (*InMemoryReconciliationRepository) UpdateOperationRetryID

func (r *InMemoryReconciliationRepository) UpdateOperationRetryID(schedulingID, correlationID, retryID string) error

func (*InMemoryReconciliationRepository) UpdateOperationState

func (r *InMemoryReconciliationRepository) UpdateOperationState(schedulingID, correlationID string, state model.OperationState, allowInState bool, reasons ...string) error

func (*InMemoryReconciliationRepository) WithTx

type Limit

type Limit struct {
	Count int
	// contains filtered or unexported fields
}

func (*Limit) FilterByInstance

func (l *Limit) FilterByInstance(re *model.ReconciliationEntity) *model.ReconciliationEntity

func (*Limit) FilterByQuery

func (l *Limit) FilterByQuery(q *db.Select) error

TODO: Limit should only apply limit without sorting.

type MockRepository

type MockRepository struct {
	CreateReconciliationResult                          *model.ReconciliationEntity
	RemoveReconciliationResult                          error
	RemoveReconciliationRecording                       []string
	GetReconciliationResult                             *model.ReconciliationEntity
	GetReconciliationsResult                            []*model.ReconciliationEntity
	GetReconciliationsCount                             int
	OnGetReconciliations                                func(*MockRepository)
	FinishReconciliationResult                          error
	GetOperationsResult                                 []*model.OperationEntity
	GetOperationResult                                  *model.OperationEntity
	GetProcessableOperationsResult                      []*model.OperationEntity
	GetReconcilingOperationsResult                      []*model.OperationEntity
	UpdateOperationStateResult                          error
	UpdateOperationRetryIDResult                        error
	UpdateOperationPickedUpResult                       error
	UpdateComponentOperationProcessingDurationResult    error
	GetComponentOperationProcessingDurationResult       int64
	GetComponentOperationProcessingDurationResultError  error
	GetMothershipOperationProcessingDurationResult      int64
	GetMothershipOperationProcessingDurationResultError error
	GetAllComponentsResult                              []string
	GetAllComponentsResultError                         error
	EnableDebugLoggingResult                            error
	GetStatusIDsOlderThanDeadlineResult                 map[int64]bool
}

func (*MockRepository) CreateReconciliation

func (*MockRepository) EnableDebugLogging

func (mr *MockRepository) EnableDebugLogging(schedulingID string, correlationID ...string) error

func (*MockRepository) FinishReconciliation

func (mr *MockRepository) FinishReconciliation(schedulingID string, status *model.ClusterStatusEntity) error

func (*MockRepository) GetAllComponents

func (mr *MockRepository) GetAllComponents() ([]string, error)

func (*MockRepository) GetComponentOperationProcessingDuration

func (mr *MockRepository) GetComponentOperationProcessingDuration(component string, state model.OperationState) (int64, error)

func (*MockRepository) GetMothershipOperationProcessingDuration

func (mr *MockRepository) GetMothershipOperationProcessingDuration(component string, state model.OperationState, startTime metricStartTime) (int64, error)

func (*MockRepository) GetOperation

func (mr *MockRepository) GetOperation(schedulingID, correlationID string) (*model.OperationEntity, error)

func (*MockRepository) GetOperations

func (mr *MockRepository) GetOperations(filters operation.Filter) ([]*model.OperationEntity, error)

func (*MockRepository) GetProcessableOperations

func (mr *MockRepository) GetProcessableOperations(maxParallelOpsPerRecon int) ([]*model.OperationEntity, error)

func (*MockRepository) GetReconciliation

func (mr *MockRepository) GetReconciliation(schedulingID string) (*model.ReconciliationEntity, error)

func (*MockRepository) GetReconciliations

func (mr *MockRepository) GetReconciliations(filter Filter) ([]*model.ReconciliationEntity, error)

func (*MockRepository) GetReconcilingOperations

func (mr *MockRepository) GetReconcilingOperations() ([]*model.OperationEntity, error)

func (*MockRepository) GetRuntimeIDs

func (mr *MockRepository) GetRuntimeIDs() ([]string, error)

func (*MockRepository) RemoveReconciliationByRuntimeID

func (mr *MockRepository) RemoveReconciliationByRuntimeID(runtimeID string) error

func (*MockRepository) RemoveReconciliationBySchedulingID

func (mr *MockRepository) RemoveReconciliationBySchedulingID(schedulingID string) error

func (*MockRepository) RemoveReconciliationsBeforeDeadline

func (mr *MockRepository) RemoveReconciliationsBeforeDeadline(runtimeID string, latestSchedulingID string, deadline time.Time) error

func (*MockRepository) RemoveReconciliationsBySchedulingID

func (mr *MockRepository) RemoveReconciliationsBySchedulingID(schedulingIDs []interface{}) error

func (*MockRepository) UpdateComponentOperationProcessingDuration

func (mr *MockRepository) UpdateComponentOperationProcessingDuration(schedulingID, correlationID string, processingDuration int) error

func (*MockRepository) UpdateOperationPickedUp

func (mr *MockRepository) UpdateOperationPickedUp(schedulingID, correlationID string) error

func (*MockRepository) UpdateOperationRetryID

func (mr *MockRepository) UpdateOperationRetryID(schedulingID, correlationID, retryID string) error

func (*MockRepository) UpdateOperationState

func (mr *MockRepository) UpdateOperationState(schedulingID, correlationID string, state model.OperationState, allowInState bool, reason ...string) error

func (*MockRepository) WithTx

func (mr *MockRepository) WithTx(tx *db.TxConnection) (Repository, error)

type PersistentReconciliationRepository

type PersistentReconciliationRepository struct {
	*repository.Repository
}

func (*PersistentReconciliationRepository) CreateReconciliation

func (*PersistentReconciliationRepository) EnableDebugLogging

func (r *PersistentReconciliationRepository) EnableDebugLogging(schedulingID string, correlationID ...string) error

func (*PersistentReconciliationRepository) FinishReconciliation

func (r *PersistentReconciliationRepository) FinishReconciliation(schedulingID string, status *model.ClusterStatusEntity) error

func (*PersistentReconciliationRepository) GetAllComponents

func (r *PersistentReconciliationRepository) GetAllComponents() ([]string, error)

func (*PersistentReconciliationRepository) GetComponentOperationProcessingDuration

func (r *PersistentReconciliationRepository) GetComponentOperationProcessingDuration(component string, state model.OperationState) (int64, error)

func (*PersistentReconciliationRepository) GetMothershipOperationProcessingDuration

func (r *PersistentReconciliationRepository) GetMothershipOperationProcessingDuration(component string, state model.OperationState, startTime metricStartTime) (int64, error)

func (*PersistentReconciliationRepository) GetOperation

func (r *PersistentReconciliationRepository) GetOperation(schedulingID, correlationID string) (*model.OperationEntity, error)

func (*PersistentReconciliationRepository) GetOperations

func (*PersistentReconciliationRepository) GetProcessableOperations

func (r *PersistentReconciliationRepository) GetProcessableOperations(maxParallelOpsPerRecon int) ([]*model.OperationEntity, error)

func (*PersistentReconciliationRepository) GetReconciliation

func (r *PersistentReconciliationRepository) GetReconciliation(schedulingID string) (*model.ReconciliationEntity, error)

func (*PersistentReconciliationRepository) GetReconciliations

func (r *PersistentReconciliationRepository) GetReconciliations(filter Filter) ([]*model.ReconciliationEntity, error)

func (*PersistentReconciliationRepository) GetReconcilingOperations

func (r *PersistentReconciliationRepository) GetReconcilingOperations() ([]*model.OperationEntity, error)

func (*PersistentReconciliationRepository) GetRuntimeIDs

func (r *PersistentReconciliationRepository) GetRuntimeIDs() ([]string, error)

func (*PersistentReconciliationRepository) RemoveReconciliationByRuntimeID

func (r *PersistentReconciliationRepository) RemoveReconciliationByRuntimeID(runtimeID string) error

func (*PersistentReconciliationRepository) RemoveReconciliationBySchedulingID

func (r *PersistentReconciliationRepository) RemoveReconciliationBySchedulingID(schedulingID string) error

func (*PersistentReconciliationRepository) RemoveReconciliationsBeforeDeadline

func (r *PersistentReconciliationRepository) RemoveReconciliationsBeforeDeadline(runtimeID string, latestSchedulingID string, deadline time.Time) error

func (*PersistentReconciliationRepository) RemoveReconciliationsBySchedulingID

func (r *PersistentReconciliationRepository) RemoveReconciliationsBySchedulingID(schedulingIDs []interface{}) error

func (*PersistentReconciliationRepository) UpdateComponentOperationProcessingDuration

func (r *PersistentReconciliationRepository) UpdateComponentOperationProcessingDuration(schedulingID, correlationID string, processingDuration int) error

func (*PersistentReconciliationRepository) UpdateOperationPickedUp

func (r *PersistentReconciliationRepository) UpdateOperationPickedUp(schedulingID, correlationID string) error

func (*PersistentReconciliationRepository) UpdateOperationRetryID

func (r *PersistentReconciliationRepository) UpdateOperationRetryID(schedulingID, correlationID, retryID string) error

func (*PersistentReconciliationRepository) UpdateOperationState

func (r *PersistentReconciliationRepository) UpdateOperationState(schedulingID, correlationID string, state model.OperationState, allowInState bool, reasons ...string) error

func (*PersistentReconciliationRepository) WithTx

type Repository

type Repository interface {
	CreateReconciliation(state *cluster.State, cfg *model.ReconciliationSequenceConfig) (*model.ReconciliationEntity, error)
	RemoveReconciliationByRuntimeID(runtimeID string) error
	RemoveReconciliationBySchedulingID(schedulingID string) error
	RemoveReconciliationsBySchedulingID(schedulingIDs []interface{}) error
	RemoveReconciliationsBeforeDeadline(runtimeID string, latestSchedulingID string, deadline time.Time) error
	GetReconciliation(schedulingID string) (*model.ReconciliationEntity, error)
	GetReconciliations(filter Filter) ([]*model.ReconciliationEntity, error)
	GetRuntimeIDs() ([]string, error)
	FinishReconciliation(schedulingID string, status *model.ClusterStatusEntity) error
	GetOperations(filter operation.Filter) ([]*model.OperationEntity, error)
	GetOperation(schedulingID, correlationID string) (*model.OperationEntity, error)
	//GetProcessableOperations returns all operations which can be assigned to a worker
	GetProcessableOperations(maxParallelOpsPerRecon int) ([]*model.OperationEntity, error)
	//GetReconcilingOperations returns all operations which are part of currently running reconciliations
	GetReconcilingOperations() ([]*model.OperationEntity, error)
	UpdateOperationState(schedulingID, correlationID string, state model.OperationState, allowInState bool, reasons ...string) error
	WithTx(tx *db.TxConnection) (Repository, error)
	UpdateOperationRetryID(schedulingID, correlationID, retryID string) error
	UpdateOperationPickedUp(schedulingID, correlationID string) error
	UpdateComponentOperationProcessingDuration(schedulingID, correlationID string, processingDuration int) error
	GetComponentOperationProcessingDuration(component string, state model.OperationState) (int64, error)
	GetMothershipOperationProcessingDuration(component string, state model.OperationState, startTime metricStartTime) (int64, error)
	GetAllComponents() ([]string, error)
	EnableDebugLogging(schedulingID string, correlationID ...string) error
}

func NewInMemoryReconciliationRepository

func NewInMemoryReconciliationRepository() Repository

func NewPersistedReconciliationRepository

func NewPersistedReconciliationRepository(conn db.Connection, debug bool) (Repository, error)

type WithClusterConfigStatus

type WithClusterConfigStatus struct {
	ClusterConfigStatus int64
}

func (*WithClusterConfigStatus) FilterByInstance

func (*WithClusterConfigStatus) FilterByQuery

func (wc *WithClusterConfigStatus) FilterByQuery(q *db.Select) error

type WithCreationDateAfter

type WithCreationDateAfter struct {
	Time time.Time
}

func (*WithCreationDateAfter) FilterByInstance

func (*WithCreationDateAfter) FilterByQuery

func (wd *WithCreationDateAfter) FilterByQuery(q *db.Select) error

type WithCreationDateBefore

type WithCreationDateBefore struct {
	Time time.Time
}

func (*WithCreationDateBefore) FilterByInstance

func (*WithCreationDateBefore) FilterByQuery

func (wd *WithCreationDateBefore) FilterByQuery(q *db.Select) error

type WithNotSchedulingID

type WithNotSchedulingID struct {
	SchedulingID string
}

func (*WithNotSchedulingID) FilterByInstance

func (*WithNotSchedulingID) FilterByQuery

func (wns *WithNotSchedulingID) FilterByQuery(q *db.Select) error

type WithRuntimeID

type WithRuntimeID struct {
	RuntimeID string
}

func (*WithRuntimeID) FilterByInstance

func (*WithRuntimeID) FilterByQuery

func (wc *WithRuntimeID) FilterByQuery(q *db.Select) error

type WithRuntimeIDs

type WithRuntimeIDs struct {
	RuntimeIDs []string
}

func (*WithRuntimeIDs) FilterByInstance

func (*WithRuntimeIDs) FilterByQuery

func (wc *WithRuntimeIDs) FilterByQuery(q *db.Select) error

type WithSchedulingID

type WithSchedulingID struct {
	SchedulingID string
}

func (*WithSchedulingID) FilterByInstance

func (*WithSchedulingID) FilterByQuery

func (ws *WithSchedulingID) FilterByQuery(q *db.Select) error

type WithStatuses

type WithStatuses struct {
	Statuses []string
}

func (*WithStatuses) FilterByInstance

func (*WithStatuses) FilterByQuery

func (ws *WithStatuses) FilterByQuery(q *db.Select) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL