Documentation ¶
Overview ¶
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
See original design discussion: https://gitlab.com/gitlab-org/gitaly/issues/1495
Index ¶
- Constants
- Variables
- func CheckPostgresVersion(db *sql.DB) error
- func CountUnavailableRepositories(ctx context.Context, db glsql.Querier, virtualStorages []string) (map[string]int, error)
- func MigrateDown(conf config.Config, max int) (int, error)
- func MigrateDownPlan(conf config.Config, max int) ([]string, error)
- func MigrateStatus(conf config.Config) (map[string]*MigrationStatusRow, error)
- type AcknowledgeParams
- type AssignmentStore
- type CachingConsistentStoragesGetter
- func (c *CachingConsistentStoragesGetter) Collect(collector chan<- prometheus.Metric)
- func (c *CachingConsistentStoragesGetter) Connected()
- func (c *CachingConsistentStoragesGetter) Describe(descs chan<- *prometheus.Desc)
- func (c *CachingConsistentStoragesGetter) Disconnect(error)
- func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
- func (c *CachingConsistentStoragesGetter) Notification(n glsql.Notification)
- type ChangeType
- type ClusterPath
- type ConsistentStoragesGetter
- type DequeParams
- type DowngradeAttemptedError
- type InvalidArgumentError
- type JobState
- type Listener
- type MigrationStatusRow
- type MockReplicationEventQueue
- type MockRepositoryStore
- func (m MockRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, ...) error
- func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error
- func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
- func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
- func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
- func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
- func (m MockRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)
- func (m MockRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
- func (m MockRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
- func (m MockRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)
- func (m MockRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
- func (m MockRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
- func (m MockRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
- func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
- func (m MockRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
- func (m MockRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
- func (m MockRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
- func (m MockRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, ...) error
- type Params
- type PostgresReplicationEventQueue
- func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
- func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error)
- func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
- func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
- func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
- type PostgresRepositoryStore
- func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, ...) error
- func (rs *PostgresRepositoryStore) DeleteAllRepositories(ctx context.Context, virtualStorage string) error
- func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error
- func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
- func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
- func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
- func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
- func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)
- func (rs *PostgresRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
- func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
- func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)
- func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
- func (rs *PostgresRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
- func (rs *PostgresRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
- func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
- func (rs *PostgresRepositoryStore) MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error)
- func (rs *PostgresRepositoryStore) MarkUnverified(ctx context.Context, repositoryID int64) (int64, error)
- func (rs *PostgresRepositoryStore) MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error)
- func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
- func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
- func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storageName string) error
- func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, ...) error
- type QueueDepthCollector
- type Replica
- type ReplicationEvent
- type ReplicationEventExistsError
- type ReplicationEventQueue
- type ReplicationEventQueueInterceptor
- func (i *ReplicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
- func (i *ReplicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error)
- func (i *ReplicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
- func (i *ReplicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
- func (i *ReplicationEventQueueInterceptor) GetAcknowledge() []AcknowledgeParams
- func (i *ReplicationEventQueueInterceptor) GetAcknowledgeResult() [][]uint64
- func (i *ReplicationEventQueueInterceptor) GetDequeued() []DequeParams
- func (i *ReplicationEventQueueInterceptor) GetDequeuedResult() [][]ReplicationEvent
- func (i *ReplicationEventQueueInterceptor) GetEnqueued() []ReplicationEvent
- func (i *ReplicationEventQueueInterceptor) GetEnqueuedResult() []ReplicationEvent
- func (i *ReplicationEventQueueInterceptor) OnAcknowledge(...)
- func (i *ReplicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) (int64, error))
- func (i *ReplicationEventQueueInterceptor) OnDequeue(...)
- func (i *ReplicationEventQueueInterceptor) OnEnqueue(...)
- func (i *ReplicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error)
- func (i *ReplicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
- func (i *ReplicationEventQueueInterceptor) Wait(deadline time.Duration, ...) error
- type ReplicationJob
- type RepositoryMetadata
- type RepositoryStore
- type RepositoryStoreCollector
- type ResilientListener
- type StorageCleanup
- func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error)
- func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]string, error)
- func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage string) error
- type VerificationQueueDepthCollector
Constants ¶
const ( // JobStateReady indicates the job is now ready to proceed. JobStateReady = JobState("ready") // JobStateInProgress indicates the job is being processed by a worker. JobStateInProgress = JobState("in_progress") // JobStateCompleted indicates the job is now complete. JobStateCompleted = JobState("completed") // JobStateFailed indicates the job did not succeed. The Replicator will retry // failed jobs. JobStateFailed = JobState("failed") // JobStateDead indicates the job was retried up to the maximum retries. JobStateDead = JobState("dead") )
const ( // UpdateRepo is when a replication updates a repository in place UpdateRepo = ChangeType("update") // CreateRepo is when a replication creates a repo CreateRepo = ChangeType("create") // DeleteRepo is when a replication deletes a repo DeleteRepo = ChangeType("delete") // DeleteReplica change type indicates that the targeted replica is due for deletion. DeleteReplica = ChangeType("delete_replica") )
Any fields added here should also be added below to GetAllChangeTypes
const ( // StorageRepositoriesUpdatesChannel is a name of the database event channel // used to send events with changes done to 'storage_repositories' table. StorageRepositoriesUpdatesChannel = "storage_repositories_updates" // RepositoriesUpdatesChannel is a name of the database event channel // used to send events with changes done to 'repositories' table. RepositoriesUpdatesChannel = "repositories_updates" )
const CorrelationIDKey = "correlation_id"
CorrelationIDKey is the key that is used to store the correlation ID for a specific replication job as part of the parameters.
const GenerationUnknown = -1
GenerationUnknown is used to indicate lack of generation number in a replication job. Older instances can produce replication jobs without a generation number.
Variables ¶
var ( // ErrNoRowsAffected is returned when a query did not perform any changes. ErrNoRowsAffected = errors.New("no rows were affected by the query") // ErrRepositoryAlreadyExists is returned when trying to insert a repository into the datastore that already // exists. ErrRepositoryAlreadyExists = errors.New("repository already exists") // ErrRepositoryNotFound is returned when looking up a repository that does not exist in the datastore. ErrRepositoryNotFound = errors.New("repository not found") )
Functions ¶
func CheckPostgresVersion ¶
CheckPostgresVersion checks the server version of the Postgres DB specified in conf. This is a diagnostic for the Praefect Postgres rollout. https://gitlab.com/gitlab-org/gitaly/issues/1755
func CountUnavailableRepositories ¶
func CountUnavailableRepositories(ctx context.Context, db glsql.Querier, virtualStorages []string) (map[string]int, error)
CountUnavailableRepositories queries the number of unavailable repositories from the database. A repository is unavailable when it has no replicas that can act as a primary, indicating they are either unhealthy or out of date.
func MigrateDown ¶
MigrateDown rolls back at most max migrations.
func MigrateDownPlan ¶
MigrateDownPlan does a dry run for rolling back at most max migrations.
func MigrateStatus ¶
func MigrateStatus(conf config.Config) (map[string]*MigrationStatusRow, error)
MigrateStatus returns the status of database migrations. The key of the map indexes the migration ID.
Types ¶
type AcknowledgeParams ¶
AcknowledgeParams is the list of parameters used for Acknowledge method call.
type AssignmentStore ¶
type AssignmentStore struct {
// contains filtered or unexported fields
}
AssignmentStore manages host assignments in Postgres.
func NewAssignmentStore ¶
func NewAssignmentStore(db glsql.Querier, configuredStorages map[string][]string) AssignmentStore
NewAssignmentStore returns a new AssignmentStore using the passed in database.
func (AssignmentStore) GetHostAssignments ¶
func (AssignmentStore) SetReplicationFactor ¶
func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met. Please see the protobuf documentation of the method for details.
type CachingConsistentStoragesGetter ¶
type CachingConsistentStoragesGetter struct {
// contains filtered or unexported fields
}
CachingConsistentStoragesGetter is a ConsistentStoragesGetter that caches up to date storages by repository. Each virtual storage has it's own cache that invalidates entries based on notifications.
func NewCachingConsistentStoragesGetter ¶
func NewCachingConsistentStoragesGetter(logger log.Logger, csg ConsistentStoragesGetter, virtualStorages []string) (*CachingConsistentStoragesGetter, error)
NewCachingConsistentStoragesGetter returns a ConsistentStoragesGetter that uses caching.
func (*CachingConsistentStoragesGetter) Collect ¶
func (c *CachingConsistentStoragesGetter) Collect(collector chan<- prometheus.Metric)
Collect collects all metrics.
func (*CachingConsistentStoragesGetter) Connected ¶
func (c *CachingConsistentStoragesGetter) Connected()
Connected enables the cache when it has been connected to Postgres.
func (*CachingConsistentStoragesGetter) Describe ¶
func (c *CachingConsistentStoragesGetter) Describe(descs chan<- *prometheus.Desc)
Describe returns all metric descriptors.
func (*CachingConsistentStoragesGetter) Disconnect ¶
func (c *CachingConsistentStoragesGetter) Disconnect(error)
Disconnect disables the caching when connection to Postgres has been lost.
func (*CachingConsistentStoragesGetter) GetConsistentStorages ¶
func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
func (*CachingConsistentStoragesGetter) Notification ¶
func (c *CachingConsistentStoragesGetter) Notification(n glsql.Notification)
Notification handles notifications by invalidating cache entries of updated repositories.
type ChangeType ¶
type ChangeType string
ChangeType indicates what kind of change the replication is propagating
func GetAllChangeTypes ¶
func GetAllChangeTypes() []ChangeType
GetAllChangeTypes is used to define and provide all the various ChangeType constants. This is useful to iterate over and set labels in metrics.
func (ChangeType) String ¶
func (ct ChangeType) String() string
type ClusterPath ¶
type ClusterPath struct { // VirtualStorage is the name of the virtual storage. VirtualStorage string // Storage is the name of the gitaly storage. Storage string }
ClusterPath represents path on the cluster to the storage.
type ConsistentStoragesGetter ¶
type ConsistentStoragesGetter interface { // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) }
ConsistentStoragesGetter returns storages which contain the latest generation of a repository.
type DequeParams ¶
DequeParams is the list of parameters used for Dequeue method call.
type DowngradeAttemptedError ¶
type DowngradeAttemptedError struct { Storage string CurrentGeneration int AttemptedGeneration int }
DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository that does not upgrade the target repository.
func (DowngradeAttemptedError) Error ¶
func (err DowngradeAttemptedError) Error() string
type InvalidArgumentError ¶
type InvalidArgumentError struct {
// contains filtered or unexported fields
}
InvalidArgumentError tags the error as being caused by an invalid argument.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener is designed to listen for PostgreSQL database NOTIFY events. It connects to the database with Listen method and starts to listen for events.
func NewListener ¶
NewListener returns a listener that is ready to listen for PostgreSQL notifications.
func (*Listener) Listen ¶
func (l *Listener) Listen(ctx context.Context, handler glsql.ListenHandler, channels ...string) error
Listen starts listening for the events. Each event is passed to the handler for processing. Listen is a blocking call, it returns in case context is cancelled or an error occurs while receiving notifications from the database.
type MigrationStatusRow ¶
MigrationStatusRow represents an entry in the schema migrations table. If the migration is in the database but is not listed, Unknown will be true.
type MockReplicationEventQueue ¶
type MockReplicationEventQueue struct { ReplicationEventQueue EnqueueFunc func(context.Context, ReplicationEvent) (ReplicationEvent, error) }
MockReplicationEventQueue is a helper for tests that implements ReplicationEventQueue and allows for parametrizing behavior.
func (*MockReplicationEventQueue) Enqueue ¶
func (m *MockReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
type MockRepositoryStore ¶
type MockRepositoryStore struct { RepositoryStore GetGenerationFunc func(ctx context.Context, repositoryID int64, storage string) (int, error) IncrementGenerationFunc func(ctx context.Context, repositoryID int64, primary string, secondaries []string) error GetReplicatedGenerationFunc func(ctx context.Context, repositoryID int64, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) ReserveRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) GetRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) GetReplicaPathFunc func(ctx context.Context, repositoryID int64) (string, error) GetRepositoryMetadataFunc func(ctx context.Context, repositoryID int64) (RepositoryMetadata, error) GetRepositoryMetadataByPathFunc func(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error) }
MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods default to what could be considered success if not set.
func (MockRepositoryStore) CreateRepository ¶
func (m MockRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
CreateRepository calls the mocked function. If no mock has been provided, it returns a nil error.
func (MockRepositoryStore) DeleteInvalidRepository ¶
func (MockRepositoryStore) DeleteReplica ¶
func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
DeleteReplica runs the mock's DeleteReplicaFunc.
func (MockRepositoryStore) DeleteRepository ¶
func (MockRepositoryStore) GetConsistentStorages ¶
func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map.
func (MockRepositoryStore) GetConsistentStoragesByRepositoryID ¶
func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map.
func (MockRepositoryStore) GetGeneration ¶
func (MockRepositoryStore) GetPartiallyAvailableRepositories ¶
func (m MockRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
GetPartiallyAvailableRepositories returns the result of GetPartiallyAvailableRepositories or nil if it is unset.
func (MockRepositoryStore) GetReplicaPath ¶
func (m MockRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
GetReplicaPath returns the result of GetReplicaPathFunc or panics if it is unset.
func (MockRepositoryStore) GetReplicatedGeneration ¶
func (MockRepositoryStore) GetRepositoryID ¶
func (m MockRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
GetRepositoryID returns the result of GetRepositoryIDFunc or 0 if it is unset.
func (MockRepositoryStore) GetRepositoryMetadata ¶
func (m MockRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
GetRepositoryMetadata returns the result of GetRepositoryMetadataFunc or panics if it is unset.
func (MockRepositoryStore) GetRepositoryMetadataByPath ¶
func (m MockRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
GetRepositoryMetadataByPath returns the result of GetRepositoryMetadataByPathFunc or panics if it is unset.
func (MockRepositoryStore) IncrementGeneration ¶
func (MockRepositoryStore) RepositoryExists ¶
func (MockRepositoryStore) ReserveRepositoryID ¶
func (m MockRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
ReserveRepositoryID returns the result of ReserveRepositoryIDFunc or 0 if it is unset.
func (MockRepositoryStore) SetAuthoritativeReplica ¶
func (m MockRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
SetAuthoritativeReplica calls the mocked function. If no mock has been provided, it returns a nil error.
func (MockRepositoryStore) SetGeneration ¶
type Params ¶
type Params map[string]interface{}
Params represent additional information required to process event after fetching it from storage. It must be JSON encodable/decodable to persist it without problems.
func (Params) GetBool ¶
GetBool returns the boolean parameter associated with the given key. Returns an error if either the key does not exist, or if the value is not a bool.
type PostgresReplicationEventQueue ¶
type PostgresReplicationEventQueue struct {
// contains filtered or unexported fields
}
PostgresReplicationEventQueue is a Postgres implementation of persistent queue.
func NewPostgresReplicationEventQueue ¶
func NewPostgresReplicationEventQueue(qc glsql.Querier) PostgresReplicationEventQueue
NewPostgresReplicationEventQueue returns new instance with provided Querier as a reference to storage.
func (PostgresReplicationEventQueue) Acknowledge ¶
func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
Acknowledge is used to delete events which have dequeue'd and are completed|dead. When `Acknowledge` method is called:
- The list of event `id`s and corresponding <lock>s retrieved from `replication_queue` table as passed in by the user `ids` could not exist in the table or the `state` of the event could differ from `in_progress` (it is possible to acknowledge only events previously fetched by the `Dequeue` method)
- Based on the list fetched on previous step the delete is executed on the `replication_queue` table. In case the new state for the entry is 'dead' it will be just deleted, but if the new state is 'completed' the event will be delete as well, but all events similar to it (events for the same repository with same change type and a source) that were created before processed events were queued for processing will also be deleted. In case the new state is something different ('failed') the event will be updated only with a new state. It returns a list of event `id`s and corresponding <lock>s of the affected events during this delete/update process.
- The removal of records in `replication_queue_job_lock` table happens that were created by step 4. of `Dequeue` method call.
- Acquisition state of <lock>s in `replication_queue_lock` table updated by comparing amount of existing bindings in `replication_queue_lock` table for the <lock> to amount of removed bindings done on the 3. for the <lock> and if amount is the same the <lock> is free and column `acquired` assigned `FALSE` value, so this <lock> can be used in the `Dequeue` method to retrieve other events. If amounts don't match no update happens and <lock> remains acquired until all events are acknowledged (binding records removed from the `replication_queue_job_lock` table).
func (PostgresReplicationEventQueue) AcknowledgeStale ¶
func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error)
AcknowledgeStale moves replication events that are 'in_progress' state for too long (more then staleAfter) into the next state:
'failed' - in case it has more attempts to be executed 'dead' - in case it has no more attempts to be executed
The job considered 'in_progress' if it has corresponding entry in the 'replication_queue_job_lock' table. When moving from 'in_progress' to other state the entry from 'replication_queue_job_lock' table will be removed and entry in the 'replication_queue_lock' will be updated if needed (release of the lock).
func (PostgresReplicationEventQueue) Dequeue ¶
func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
Dequeue is used to pop events from the queue matching specific filters When `Dequeue` method is called:
Events with attempts left that are either in `ready` or `failed` state are candidates for dequeuing. Events already being processed by another worker are filtered out by checking if the event is already locked in the `replication_queue_job_lock` table.
Events for repositories that are already locked by another Praefect instance are filtered out. Repository locks are stored in the `replication_queue_lock` table.
The events that still remain after filtering are dequeued. On dequeuing: - The event's attempts are decremented by 1. - The event's state is set to `in_progress` - The event's `updated_at` is set to current time in UTC.
For each event retrieved from the step above a new record would be created in `replication_queue_job_lock` table. Rows in this table allows us to track events that were fetched for processing and relation of them with the locks in the `replication_queue_lock` table. The reason we need it is because multiple events can be fetched for the same repository (more details on it in `Acknowledge` below).
Update the corresponding <lock> in `replication_queue_lock` table and column `acquired` is assigned with `TRUE` value to signal that this <lock> is busy and can't be used to fetch events (step 2.).
As a special case, 'delete_replica' type events have unlimited attempts. This is to ensure we never partially apply the job by deleting the repository from the disk but leaving it still present in the database. Praefect would then see that there still is a replica on the storage, when there is none in fact. That could cause us to delete all replicas of a repository.
func (PostgresReplicationEventQueue) Enqueue ¶
func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
Enqueue puts the provided event into the persistent queue. When `Enqueue` method is called:
- Insertion of the new record into `replication_queue_lock` table, so we are ensured all events have a corresponding <lock>. If a record already exists it won't be inserted again.
- Insertion of the new record into the `replication_queue` table with the defaults listed above, the job, the meta and corresponding <lock> used in `replication_queue_lock` table for the `lock_id` column.
func (PostgresReplicationEventQueue) StartHealthUpdate ¶
func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
StartHealthUpdate starts periodical update of the event's health identifier. The events with fresh health identifier won't be considered as stale. The health update will be executed on each new entry received from trigger channel passed in. It is a blocking call that is managed by the passed in context.
type PostgresRepositoryStore ¶
type PostgresRepositoryStore struct {
// contains filtered or unexported fields
}
PostgresRepositoryStore is a Postgres implementation of RepositoryStore. Refer to the interface for method documentation.
func NewPostgresRepositoryStore ¶
func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string][]string) *PostgresRepositoryStore
NewPostgresRepositoryStore returns a Postgres implementation of RepositoryStore.
func (*PostgresRepositoryStore) CreateRepository ¶
func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
CreateRepository creates a record for a repository in the specified virtual storage and relative path. Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the store.
storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as the repository's primary.
storeAssignments should be set when variable replication factor is enabled. When set, the primary and the secondaries are stored as the assigned hosts of the repository.
func (*PostgresRepositoryStore) DeleteAllRepositories ¶
func (rs *PostgresRepositoryStore) DeleteAllRepositories(ctx context.Context, virtualStorage string) error
func (*PostgresRepositoryStore) DeleteInvalidRepository ¶
func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error
DeleteInvalidRepository deletes the given replica. If the replica was the only replica of the repository, then the repository will be deleted, as well.
func (*PostgresRepositoryStore) DeleteReplica ¶
func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.
func (*PostgresRepositoryStore) DeleteRepository ¶
func (*PostgresRepositoryStore) GetConsistentStorages ¶
func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
func (*PostgresRepositoryStore) GetConsistentStoragesByRepositoryID ¶
func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
func (*PostgresRepositoryStore) GetGeneration ¶
func (*PostgresRepositoryStore) GetPartiallyAvailableRepositories ¶
func (rs *PostgresRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which are not able to serve requests at the moment.
func (*PostgresRepositoryStore) GetReplicaPath ¶
func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record for the repository ID is not found.
func (*PostgresRepositoryStore) GetReplicatedGeneration ¶
func (*PostgresRepositoryStore) GetRepositoryID ¶
func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a ErrRepositoryNotFound error if the repository doesn't exist.
func (*PostgresRepositoryStore) GetRepositoryMetadata ¶
func (rs *PostgresRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
GetRepositoryMetadata retrieves a repository's metadata.
func (*PostgresRepositoryStore) GetRepositoryMetadataByPath ¶
func (rs *PostgresRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path.
func (*PostgresRepositoryStore) IncrementGeneration ¶
func (*PostgresRepositoryStore) MarkStorageUnverified ¶
func (rs *PostgresRepositoryStore) MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error)
MarkStorageUnverified marsk all replicas on the storage as unverified.
func (*PostgresRepositoryStore) MarkUnverified ¶
func (rs *PostgresRepositoryStore) MarkUnverified(ctx context.Context, repositoryID int64) (int64, error)
MarkUnverified marks replicas of the repository unverified.
func (*PostgresRepositoryStore) MarkVirtualStorageUnverified ¶
func (rs *PostgresRepositoryStore) MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error)
MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified.
func (*PostgresRepositoryStore) RepositoryExists ¶
func (*PostgresRepositoryStore) ReserveRepositoryID ¶
func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already exists with the given virtual storage and relative path combination, an error is returned.
func (*PostgresRepositoryStore) SetAuthoritativeReplica ¶
func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storageName string) error
SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
func (*PostgresRepositoryStore) SetGeneration ¶
type QueueDepthCollector ¶
type QueueDepthCollector struct {
// contains filtered or unexported fields
}
QueueDepthCollector collects metrics describing replication queue depths
func NewQueueDepthCollector ¶
func NewQueueDepthCollector(log log.Logger, db glsql.Querier, timeout time.Duration) *QueueDepthCollector
NewQueueDepthCollector returns a new QueueDepthCollector
func (*QueueDepthCollector) Collect ¶
func (q *QueueDepthCollector) Collect(ch chan<- prometheus.Metric)
Collect collects metrics describing the replication queue depth
func (*QueueDepthCollector) Describe ¶
func (q *QueueDepthCollector) Describe(ch chan<- *prometheus.Desc)
type Replica ¶
type Replica struct { // Storage is the name of the replica's storage. Storage string // Generation is the replica's confirmed generation. If the replica does not yet exists, generation // is -1. Generation int64 // Assigned indicates whether the storage is an assigned host of the repository. Assigned bool // Healthy indicates whether the replica is considered healthy by the consensus of Praefect nodes. Healthy bool // ValidPrimary indicates whether the replica is ready to serve as the primary if necessary. ValidPrimary bool // VerifiedAt is the last successful verification time of the replica. VerifiedAt time.Time }
Replica represents a replica of a repository.
type ReplicationEvent ¶
type ReplicationEvent struct { ID uint64 State JobState Attempt int LockID string CreatedAt time.Time UpdatedAt *time.Time Job ReplicationJob Meta Params }
ReplicationEvent is a persistent representation of the replication event.
func (*ReplicationEvent) Mapping ¶
func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error)
Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.
type ReplicationEventExistsError ¶
type ReplicationEventExistsError struct {
// contains filtered or unexported fields
}
ReplicationEventExistsError is returned when trying to add an already existing replication event into the queue.
func (ReplicationEventExistsError) Error ¶
func (err ReplicationEventExistsError) Error() string
Error returns the errors message.
type ReplicationEventQueue ¶
type ReplicationEventQueue interface { // Enqueue puts provided event into the persistent queue. Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) // Dequeue retrieves events from the persistent queue using provided limitations and filters. Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) // Acknowledge updates previously dequeued events with the new state and releases resources acquired for it. // It updates events that are in 'in_progress' state to the state that is passed in. // It also updates state of similar events (scheduled for the same repository with same change from the same source) // that are in 'ready' state and created before the target event was dequeue for the processing if the new state is // 'completed'. Otherwise it won't be changed. // It returns sub-set of passed in ids that were updated. Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) // StartHealthUpdate starts periodical update of the event's health identifier. // The events with fresh health identifier won't be considered as stale. // The health update will be executed on each new entry received from trigger channel passed in. // It is a blocking call that is managed by the passed in context. StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error // AcknowledgeStale moves replication events that are 'in_progress' state for too long (more than staleAfter) // into the next state: // 'failed' - in case it has more attempts to be executed // 'dead' - in case it has no more attempts to be executed AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error) }
ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
func NewMemoryReplicationEventQueue ¶
func NewMemoryReplicationEventQueue(conf config.Config) ReplicationEventQueue
NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue.
type ReplicationEventQueueInterceptor ¶
type ReplicationEventQueueInterceptor struct { ReplicationEventQueue // contains filtered or unexported fields }
ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface. It also provides additional methods to get info about incoming and outgoing data from the underling queue. NOTE: it should be used for testing purposes only as it persists data in memory and doesn't clean it up.
func NewReplicationEventQueueInterceptor ¶
func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) *ReplicationEventQueueInterceptor
NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
func (*ReplicationEventQueueInterceptor) Acknowledge ¶
func (i *ReplicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
Acknowledge intercepts call to the Acknowledge method of the underling implementation or a call back. It populates storage of incoming and outgoing parameters before and after method call.
func (*ReplicationEventQueueInterceptor) AcknowledgeStale ¶
func (i *ReplicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error)
AcknowledgeStale intercepts call to the AcknowledgeStale method of the underling implementation or a call back.
func (*ReplicationEventQueueInterceptor) Dequeue ¶
func (i *ReplicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
Dequeue intercepts call to the Dequeue method of the underling implementation or a call back. It populates storage of incoming and outgoing parameters before and after method call.
func (*ReplicationEventQueueInterceptor) Enqueue ¶
func (i *ReplicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
Enqueue intercepts call to the Enqueue method of the underling implementation or a call back. It populates storage of incoming and outgoing parameters before and after method call.
func (*ReplicationEventQueueInterceptor) GetAcknowledge ¶
func (i *ReplicationEventQueueInterceptor) GetAcknowledge() []AcknowledgeParams
GetAcknowledge returns a list of parameters used for Acknowledge method or a call-back invocation.
func (*ReplicationEventQueueInterceptor) GetAcknowledgeResult ¶
func (i *ReplicationEventQueueInterceptor) GetAcknowledgeResult() [][]uint64
GetAcknowledgeResult returns a list of results returned after Acknowledge method or a call-back invocation.
func (*ReplicationEventQueueInterceptor) GetDequeued ¶
func (i *ReplicationEventQueueInterceptor) GetDequeued() []DequeParams
GetDequeued returns a list of parameters used for Dequeue method or a call-back invocation.
func (*ReplicationEventQueueInterceptor) GetDequeuedResult ¶
func (i *ReplicationEventQueueInterceptor) GetDequeuedResult() [][]ReplicationEvent
GetDequeuedResult returns a list of events returned after Dequeue method or a call-back invocation.
func (*ReplicationEventQueueInterceptor) GetEnqueued ¶
func (i *ReplicationEventQueueInterceptor) GetEnqueued() []ReplicationEvent
GetEnqueued returns a list of events used for Enqueue method or a call-back invocation.
func (*ReplicationEventQueueInterceptor) GetEnqueuedResult ¶
func (i *ReplicationEventQueueInterceptor) GetEnqueuedResult() []ReplicationEvent
GetEnqueuedResult returns a list of events returned by Enqueue method or a call-back invocation.
func (*ReplicationEventQueueInterceptor) OnAcknowledge ¶
func (i *ReplicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error))
OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called.
func (*ReplicationEventQueueInterceptor) OnAcknowledgeStale ¶
func (i *ReplicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) (int64, error))
OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called.
func (*ReplicationEventQueueInterceptor) OnDequeue ¶
func (i *ReplicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error))
OnDequeue allows to set action that would be executed each time when `Dequeue` method called.
func (*ReplicationEventQueueInterceptor) OnEnqueue ¶
func (i *ReplicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error))
OnEnqueue allows to set action that would be executed each time when `Enqueue` method called.
func (*ReplicationEventQueueInterceptor) OnStartHealthUpdate ¶
func (i *ReplicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error)
OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called.
func (*ReplicationEventQueueInterceptor) StartHealthUpdate ¶
func (i *ReplicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
StartHealthUpdate intercepts call to the StartHealthUpdate method of the underling implementation or a call back.
func (*ReplicationEventQueueInterceptor) Wait ¶
func (i *ReplicationEventQueueInterceptor) Wait(deadline time.Duration, condition func(i *ReplicationEventQueueInterceptor) bool) error
Wait checks the condition in a loop with await until it returns true or deadline is exceeded. The error is returned only in case the deadline is exceeded.
type ReplicationJob ¶
type ReplicationJob struct { // RepositoryID is the ID of the repository this job relates to. RepositoryID // may be 0 if the job doesn't relate to any known repository. This can happen // for example when the job is deleting an orphaned replica of a deleted repository. RepositoryID int64 `json:"repository_id"` // ReplicaPath is the relative path where the replicas are stored in the Gitaly storages. ReplicaPath string `json:"replica_path"` Change ChangeType `json:"change"` // RelativePath is the virtual relative path the client uses to access the repository on the // virtual storage. The actual path that is used to store the repository on the disks is the // ReplicaPath. This can be removed in the future but is still carried in the jobs as the // replication queue locking depends on this. RelativePath string `json:"relative_path"` TargetNodeStorage string `json:"target_node_storage"` SourceNodeStorage string `json:"source_node_storage"` VirtualStorage string `json:"virtual_storage"` Params Params `json:"params"` }
ReplicationJob is a persistent representation of the replication job.
func (*ReplicationJob) Scan ¶
func (job *ReplicationJob) Scan(value interface{}) error
Scan a value of json data into the ReplicationJob
type RepositoryMetadata ¶
type RepositoryMetadata struct { // RepositoryID is the internal id of the repository. RepositoryID int64 // VirtualStorage is the virtual storage where the repository is. VirtualStorage string // RelativePath is the relative path of the repository. RelativePath string // ReplicaPath is the actual disk location where the replicas are stored in the storages. ReplicaPath string // Primary is the current primary of this repository. Primary string // Generation is the current generation of the repository. Generation int64 // Replicas contains information of the repository on each storage that contains the repository // or does not contain the repository but is assigned to host it. Replicas []Replica }
RepositoryMetadata contains the repository's metadata.
type RepositoryStore ¶
type RepositoryStore interface { // GetGeneration gets the repository's generation on a given storage. GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) // IncrementGeneration increments the generations of up to date nodes. IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error // SetGeneration sets the repository's generation on the given storage. If the generation is higher // than the virtual storage's generation, it is set to match as well to guarantee monotonic increments. SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error // GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record // for the repository ID is not found. GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) // GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would // downgrade, a DowngradeAttemptedError is returned. GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) // CreateRepository creates a record for a repository in the specified virtual storage and relative path. // Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated // and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed // the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the store. // // storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as // the repository's primary. // // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error // SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one. SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // DeleteRepository deletes the database records associated with the repository. It returns the replica path and the storages // which are known to have a replica at the time of deletion. ErrRepositoryNotFound is returned when // the repository is not tracked by the Praefect datastore. DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) // DeleteAllRepositories deletes the database records associated with // repositories in the specified virtual storage. DeleteAllRepositories(ctx context.Context, virtualStorage string) error // DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage. DeleteReplica(ctx context.Context, repositoryID int64, storage string) error // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) ConsistentStoragesGetter // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) // GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which // are not able to serve requests at the moment. GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) // DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's // record of the invalid repository. If the storage was the only storage with the repository, the repository's // record on the virtual storage is also deleted. DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error // ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already // exists with the given virtual storage and relative path combination, an error is returned. ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) // GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a // ErrRepositoryNotFound error if the repository doesn't exist. GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) // GetRepositoryMetadata retrieves a repository's metadata. GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error) // GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path. GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error) // MarkUnverified marks replicas of the repository unverified. MarkUnverified(ctx context.Context, repositoryID int64) (int64, error) // MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified. MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) // MarkStorageUnverified marsk all replicas on the storage as unverified. MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) }
RepositoryStore provides access to repository state.
type RepositoryStoreCollector ¶
type RepositoryStoreCollector struct {
// contains filtered or unexported fields
}
RepositoryStoreCollector collects metrics from the RepositoryStore.
func NewRepositoryStoreCollector ¶
func NewRepositoryStoreCollector( log log.Logger, virtualStorages []string, db glsql.Querier, timeout time.Duration, ) *RepositoryStoreCollector
NewRepositoryStoreCollector returns a new collector.
func (*RepositoryStoreCollector) Collect ¶
func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric)
func (*RepositoryStoreCollector) Describe ¶
func (c *RepositoryStoreCollector) Describe(ch chan<- *prometheus.Desc)
type ResilientListener ¶
type ResilientListener struct {
// contains filtered or unexported fields
}
ResilientListener allows listen for notifications resiliently.
func NewResilientListener ¶
func NewResilientListener(conf config.DB, ticker helper.Ticker, logger log.Logger) *ResilientListener
NewResilientListener returns instance of the *ResilientListener.
func (*ResilientListener) Collect ¶
func (rl *ResilientListener) Collect(metrics chan<- promclient.Metric)
Collect returns set of metrics collected during execution.
func (*ResilientListener) Describe ¶
func (rl *ResilientListener) Describe(descs chan<- *promclient.Desc)
Describe return description of the metric.
func (*ResilientListener) Listen ¶
func (rl *ResilientListener) Listen(ctx context.Context, handler glsql.ListenHandler, channels ...string) error
Listen starts a new Listener and listens for the notifications on the channels. If error occurs and connection is closed/terminated another Listener is created after some await period. The method returns only when provided context is cancelled or invalid configuration is used.
type StorageCleanup ¶
type StorageCleanup struct {
// contains filtered or unexported fields
}
StorageCleanup provides methods on the database for the repository cleanup operation.
func NewStorageCleanup ¶
func NewStorageCleanup(db *sql.DB) *StorageCleanup
NewStorageCleanup initialises and returns a new instance of the StorageCleanup.
func (*StorageCleanup) AcquireNextStorage ¶
func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error)
AcquireNextStorage picks up the next storage for processing. Once acquired no other call to the same method will return the same storage, so it works as exclusive lock on that entry. Once processing is done the returned function needs to be called to release acquired storage. It updates last_run column of the entry on execution.
func (*StorageCleanup) DoesntExist ¶
func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]string, error)
DoesntExist returns replica path for each repository that doesn't exist in the database by querying repositories and storage_repositories tables.
type VerificationQueueDepthCollector ¶
type VerificationQueueDepthCollector struct {
// contains filtered or unexported fields
}
VerificationQueueDepthCollector collects the verification queue depth metric from the database.
func NewVerificationQueueDepthCollector ¶
func NewVerificationQueueDepthCollector(log log.Logger, db glsql.Querier, timeout, verificationInterval time.Duration, configuredStorages map[string][]string) *VerificationQueueDepthCollector
NewVerificationQueueDepthCollector returns a new VerificationQueueDepthCollector
func (*VerificationQueueDepthCollector) Collect ¶
func (c *VerificationQueueDepthCollector) Collect(ch chan<- prometheus.Metric)
Collect collects the verification queue depth metric from the database.
func (*VerificationQueueDepthCollector) Describe ¶
func (c *VerificationQueueDepthCollector) Describe(ch chan<- *prometheus.Desc)
Describe describes the collected metrics to Prometheus.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
|
Package advisorylock contains the lock IDs of all advisory locks used in Praefect. |
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
|
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries. |