Documentation ¶
Overview ¶
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
See original design discussion: https://gitlab.com/gitlab-org/gitaly/issues/1495
Index ¶
- Constants
- Variables
- func CheckPostgresVersion(db *sql.DB) error
- func MigrateDown(conf config.Config, max int) (int, error)
- func MigrateDownPlan(conf config.Config, max int) ([]string, error)
- func MigrateStatus(conf config.Config) (map[string]*MigrationStatusRow, error)
- type AssignmentStore
- type CachingConsistentStoragesGetter
- func (c *CachingConsistentStoragesGetter) Collect(collector chan<- prometheus.Metric)
- func (c *CachingConsistentStoragesGetter) Connected()
- func (c *CachingConsistentStoragesGetter) Describe(descs chan<- *prometheus.Desc)
- func (c *CachingConsistentStoragesGetter) Disconnect(error)
- func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
- func (c *CachingConsistentStoragesGetter) Notification(n glsql.Notification)
- type ChangeType
- type ConsistentStoragesGetter
- type DowngradeAttemptedError
- type InvalidArgumentError
- type JobState
- type MigrationStatusRow
- type MockReplicationEventQueue
- type MockRepositoryStore
- func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, ...) error
- func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
- func (m MockRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
- func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
- func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
- func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
- func (m MockRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error)
- func (m MockRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
- func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, ...) error
- func (m MockRepositoryStore) RenameRepository(ctx context.Context, ...) error
- func (m MockRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
- func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, ...) error
- type OutdatedRepository
- type OutdatedRepositoryStorageDetails
- type Params
- type PostgresListener
- type PostgresListenerOpts
- type PostgresReplicationEventQueue
- func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
- func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error
- func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
- func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
- func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
- type PostgresRepositoryStore
- func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, ...) error
- func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
- func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
- func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
- func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
- func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error)
- func (rs *PostgresRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, useVirtualStoragePrimaries bool) ([]OutdatedRepository, error)
- func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error)
- func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, ...) error
- func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, ...) error
- func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
- func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, ...) error
- type ReplicationEvent
- type ReplicationEventQueue
- type ReplicationEventQueueInterceptor
- type ReplicationJob
- type RepositoryExistsError
- type RepositoryNotExistsError
- type RepositoryStore
- type RepositoryStoreCollector
Constants ¶
const ( // JobStateReady indicates the job is now ready to proceed. JobStateReady = JobState("ready") // JobStateInProgress indicates the job is being processed by a worker. JobStateInProgress = JobState("in_progress") // JobStateCompleted indicates the job is now complete. JobStateCompleted = JobState("completed") // JobStateCancelled indicates the job was cancelled. This can occur if the // job is no longer relevant (e.g. a node is moved out of a repository). JobStateCancelled = JobState("cancelled") // JobStateFailed indicates the job did not succeed. The Replicator will retry // failed jobs. JobStateFailed = JobState("failed") // JobStateDead indicates the job was retried up to the maximum retries. JobStateDead = JobState("dead") )
const ( // UpdateRepo is when a replication updates a repository in place UpdateRepo = ChangeType("update") // CreateRepo is when a replication creates a repo CreateRepo = ChangeType("create") // DeleteRepo is when a replication deletes a repo DeleteRepo = ChangeType("delete") // DeleteReplica change type indicates that the targeted replica is due for deletion. DeleteReplica = ChangeType("delete_replica") // RenameRepo is when a replication renames repo RenameRepo = ChangeType("rename") // GarbageCollect is when replication runs gc GarbageCollect = ChangeType("gc") // RepackFull is when replication runs a full repack RepackFull = ChangeType("repack_full") // RepackIncremental is when replication runs an incremental repack RepackIncremental = ChangeType("repack_incremental") // Cleanup is when replication runs a repo cleanup Cleanup = ChangeType("cleanup") // PackRefs is when replication optimizes references in a repo PackRefs = ChangeType("pack_refs") )
const GenerationUnknown = -1
GenerationUnknown is used to indicate lack of generation number in a replication job. Older instances can produce replication jobs without a generation number.
Variables ¶
var DefaultPostgresListenerOpts = PostgresListenerOpts{ PingPeriod: 10 * time.Second, MinReconnectInterval: 5 * time.Second, MaxReconnectInterval: 40 * time.Second, }
DefaultPostgresListenerOpts pre-defined options for PostgreSQL listener.
Functions ¶
func CheckPostgresVersion ¶
CheckPostgresVersion checks the server version of the Postgres DB specified in conf. This is a diagnostic for the Praefect Postgres rollout. https://gitlab.com/gitlab-org/gitaly/issues/1755
func MigrateDown ¶
MigrateDown rolls back at most max migrations.
func MigrateDownPlan ¶
MigrateDownPlan does a dry run for rolling back at most max migrations.
func MigrateStatus ¶
func MigrateStatus(conf config.Config) (map[string]*MigrationStatusRow, error)
MigrateStatus returns the status of database migrations. The key of the map indexes the migration ID.
Types ¶
type AssignmentStore ¶
type AssignmentStore struct {
// contains filtered or unexported fields
}
AssignmentStore manages host assignments in Postgres.
func NewAssignmentStore ¶
func NewAssignmentStore(db glsql.Querier, configuredStorages map[string][]string) AssignmentStore
NewAssignmentStore returns a new AssignmentStore using the passed in database.
func (AssignmentStore) GetHostAssignments ¶
func (AssignmentStore) SetReplicationFactor ¶
func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met. Please see the protobuf documentation of the method for details.
type CachingConsistentStoragesGetter ¶
type CachingConsistentStoragesGetter struct {
// contains filtered or unexported fields
}
CachingConsistentStoragesGetter is a ConsistentStoragesGetter that caches up to date storages by repository. Each virtual storage has it's own cache that invalidates entries based on notifications.
func NewCachingConsistentStoragesGetter ¶
func NewCachingConsistentStoragesGetter(logger logrus.FieldLogger, csg ConsistentStoragesGetter, virtualStorages []string) (*CachingConsistentStoragesGetter, error)
NewCachingConsistentStoragesGetter returns a ConsistentStoragesGetter that uses caching.
func (*CachingConsistentStoragesGetter) Collect ¶
func (c *CachingConsistentStoragesGetter) Collect(collector chan<- prometheus.Metric)
Collect collects all metrics.
func (*CachingConsistentStoragesGetter) Connected ¶
func (c *CachingConsistentStoragesGetter) Connected()
Connected enables the cache when it has been connected to Postgres.
func (*CachingConsistentStoragesGetter) Describe ¶
func (c *CachingConsistentStoragesGetter) Describe(descs chan<- *prometheus.Desc)
Describe returns all metric descriptors.
func (*CachingConsistentStoragesGetter) Disconnect ¶
func (c *CachingConsistentStoragesGetter) Disconnect(error)
Disconnect disables the caching when connection to Postgres has been lost.
func (*CachingConsistentStoragesGetter) GetConsistentStorages ¶
func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
GetConsistentStorages returns list of gitaly storages that are in up to date state based on the generation tracking.
func (*CachingConsistentStoragesGetter) Notification ¶
func (c *CachingConsistentStoragesGetter) Notification(n glsql.Notification)
Notification handles notifications by invalidating cache entries of updated repositories.
type ChangeType ¶
type ChangeType string
ChangeType indicates what kind of change the replication is propagating
func (ChangeType) String ¶
func (ct ChangeType) String() string
type ConsistentStoragesGetter ¶
type ConsistentStoragesGetter interface { // GetConsistentStorages checks which storages are on the latest generation and returns them. Returns a // commonerr.RepositoryNotFoundError if the repository does not exist. GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) }
ConsistentStoragesGetter returns storages which contain the latest generation of a repository.
type DowngradeAttemptedError ¶
type DowngradeAttemptedError struct { VirtualStorage string RelativePath string Storage string CurrentGeneration int AttemptedGeneration int }
DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository that does not upgrade the target repository.
func (DowngradeAttemptedError) Error ¶
func (err DowngradeAttemptedError) Error() string
type InvalidArgumentError ¶
type InvalidArgumentError struct {
// contains filtered or unexported fields
}
InvalidArgumentError tags the error as being caused by an invalid argument.
type MigrationStatusRow ¶
MigrationStatusRow represents an entry in the schema migrations table. If the migration is in the database but is not listed, Unknown will be true.
type MockReplicationEventQueue ¶
type MockReplicationEventQueue struct { ReplicationEventQueue EnqueueFunc func(context.Context, ReplicationEvent) (ReplicationEvent, error) }
MockReplicationEventQueue is a helper for tests that implements ReplicationEventQueue and allows for parametrizing behavior.
func (*MockReplicationEventQueue) Enqueue ¶
func (m *MockReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
type MockRepositoryStore ¶
type MockRepositoryStore struct { GetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) GetPartiallyReplicatedRepositoriesFunc func(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error) DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) }
MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods default to what could be considered success if not set.
func (MockRepositoryStore) CreateRepository ¶
func (MockRepositoryStore) DeleteInvalidRepository ¶
func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
func (MockRepositoryStore) DeleteReplica ¶
func (m MockRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
DeleteReplica runs the mock's DeleteReplicaFunc.
func (MockRepositoryStore) DeleteRepository ¶
func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
func (MockRepositoryStore) GetConsistentStorages ¶
func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map.
func (MockRepositoryStore) GetGeneration ¶
func (MockRepositoryStore) GetPartiallyReplicatedRepositories ¶
func (m MockRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStorageScopedPrimaries bool) ([]OutdatedRepository, error)
func (MockRepositoryStore) GetReplicatedGeneration ¶
func (MockRepositoryStore) IncrementGeneration ¶
func (MockRepositoryStore) RenameRepository ¶
func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
func (MockRepositoryStore) RepositoryExists ¶
func (MockRepositoryStore) SetGeneration ¶
type OutdatedRepository ¶
type OutdatedRepository struct { // RelativePath is the relative path of the repository. RelativePath string // Primary is the current primary of this repository. Primary string // Storages contains information of the repository on each storage that contains the repository // or does not contain the repository but is assigned to host it. Storages []OutdatedRepositoryStorageDetails }
OutdatedRepository is a repository with one or more outdated assigned storages.
type OutdatedRepositoryStorageDetails ¶
type OutdatedRepositoryStorageDetails struct { // Name of the storage as configured. Name string // BehindBy indicates how many generations the storage's copy of the repository is missing at maximum. BehindBy int // Assigned indicates whether the storage is an assigned host of the repository. Assigned bool }
OutdatedRepositoryStorageDetails represents a storage that contains or should contain a copy of the repository.
type Params ¶
type Params map[string]interface{}
Params represent additional information required to process event after fetching it from storage. It must be JSON encodable/decodable to persist it without problems.
type PostgresListener ¶
type PostgresListener struct {
// contains filtered or unexported fields
}
PostgresListener is an implementation based on the PostgreSQL LISTEN/NOTIFY functions.
func NewPostgresListener ¶
func NewPostgresListener(logger logrus.FieldLogger, opts PostgresListenerOpts, handler glsql.ListenHandler) (*PostgresListener, error)
NewPostgresListener returns a new instance of the listener.
func (*PostgresListener) Close ¶
func (pgl *PostgresListener) Close() error
func (*PostgresListener) Collect ¶
func (pgl *PostgresListener) Collect(metrics chan<- promclient.Metric)
func (*PostgresListener) Describe ¶
func (pgl *PostgresListener) Describe(descs chan<- *promclient.Desc)
type PostgresListenerOpts ¶
type PostgresListenerOpts struct { // Addr is an address to database instance. Addr string // Channels is a list of channel to listen for notifications. Channels []string // PingPeriod is a period to wait before executing a pin call on the connection to verify if it is still healthy. PingPeriod time.Duration // MinReconnectInterval controls the duration to wait before trying to // re-establish the database connection after connection loss. MinReconnectInterval time.Duration // MaxReconnectInterval is a max interval to wait until successful connection establishment. MaxReconnectInterval time.Duration }
PostgresListenerOpts is a set of configuration options for the PostgreSQL listener.
type PostgresReplicationEventQueue ¶
type PostgresReplicationEventQueue struct {
// contains filtered or unexported fields
}
PostgresReplicationEventQueue is a Postgres implementation of persistent queue.
func NewPostgresReplicationEventQueue ¶
func NewPostgresReplicationEventQueue(qc glsql.Querier) PostgresReplicationEventQueue
NewPostgresReplicationEventQueue returns new instance with provided Querier as a reference to storage.
func (PostgresReplicationEventQueue) Acknowledge ¶
func (PostgresReplicationEventQueue) AcknowledgeStale ¶
func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error
AcknowledgeStale moves replication events that are 'in_progress' state for too long (more then staleAfter) into the next state:
'failed' - in case it has more attempts to be executed 'dead' - in case it has no more attempts to be executed
The job considered 'in_progress' if it has corresponding entry in the 'replication_queue_job_lock' table. When moving from 'in_progress' to other state the entry from 'replication_queue_job_lock' table will be removed and entry in the 'replication_queue_lock' will be updated if needed (release of the lock).
func (PostgresReplicationEventQueue) Dequeue ¶
func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
func (PostgresReplicationEventQueue) Enqueue ¶
func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
func (PostgresReplicationEventQueue) StartHealthUpdate ¶
func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
StartHealthUpdate starts periodical update of the event's health identifier. The events with fresh health identifier won't be considered as stale. The health update will be executed on each new entry received from trigger channel passed in. It is a blocking call that is managed by the passed in context.
type PostgresRepositoryStore ¶
type PostgresRepositoryStore struct {
// contains filtered or unexported fields
}
PostgresRepositoryStore is a Postgres implementation of RepositoryStore. Refer to the interface for method documentation.
func NewPostgresRepositoryStore ¶
func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string][]string) *PostgresRepositoryStore
NewPostgresRepositoryStore returns a Postgres implementation of RepositoryStore.
func (*PostgresRepositoryStore) CreateRepository ¶
func (*PostgresRepositoryStore) DeleteInvalidRepository ¶
func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
func (*PostgresRepositoryStore) DeleteReplica ¶
func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.
func (*PostgresRepositoryStore) DeleteRepository ¶
func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error
func (*PostgresRepositoryStore) GetConsistentStorages ¶
func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error)
GetConsistentStorages checks which storages are on the latest generation and returns them.
func (*PostgresRepositoryStore) GetGeneration ¶
func (*PostgresRepositoryStore) GetPartiallyReplicatedRepositories ¶
func (rs *PostgresRepositoryStore) GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, useVirtualStoragePrimaries bool) ([]OutdatedRepository, error)
func (*PostgresRepositoryStore) GetReplicatedGeneration ¶
func (*PostgresRepositoryStore) IncrementGeneration ¶
func (*PostgresRepositoryStore) RenameRepository ¶
func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
func (*PostgresRepositoryStore) RepositoryExists ¶
func (*PostgresRepositoryStore) SetGeneration ¶
type ReplicationEvent ¶
type ReplicationEvent struct { ID uint64 State JobState Attempt int LockID string CreatedAt time.Time UpdatedAt *time.Time Job ReplicationJob Meta Params }
ReplicationEvent is a persistent representation of the replication event.
func (*ReplicationEvent) Mapping ¶
func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error)
Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.
type ReplicationEventQueue ¶
type ReplicationEventQueue interface { // Enqueue puts provided event into the persistent queue. Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) // Dequeue retrieves events from the persistent queue using provided limitations and filters. Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) // Acknowledge updates previously dequeued events with the new state and releases resources acquired for it. // It updates events that are in 'in_progress' state to the state that is passed in. // It also updates state of similar events (scheduled fot the same repository with same change from the same source) // that are in 'ready' state and created before the target event was dequeue for the processing if the new state is // 'completed'. Otherwise it won't be changed. // It returns sub-set of passed in ids that were updated. Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) // StartHealthUpdate starts periodical update of the event's health identifier. // The events with fresh health identifier won't be considered as stale. // The health update will be executed on each new entry received from trigger channel passed in. // It is a blocking call that is managed by the passed in context. StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error // AcknowledgeStale moves replication events that are 'in_progress' state for too long (more than staleAfter) // into the next state: // 'failed' - in case it has more attempts to be executed // 'dead' - in case it has no more attempts to be executed AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error }
ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
func NewMemoryReplicationEventQueue ¶
func NewMemoryReplicationEventQueue(conf config.Config) ReplicationEventQueue
NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue.
type ReplicationEventQueueInterceptor ¶
type ReplicationEventQueueInterceptor interface { // ReplicationEventQueue actual implementation. ReplicationEventQueue // OnEnqueue allows to set action that would be executed each time when `Enqueue` method called. OnEnqueue(func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) // OnDequeue allows to set action that would be executed each time when `Dequeue` method called. OnDequeue(func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) // OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called. OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called. OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error) // OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called. OnAcknowledgeStale(func(context.Context, time.Duration) error) }
ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface.
func NewReplicationEventQueueInterceptor ¶
func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) ReplicationEventQueueInterceptor
NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
type ReplicationJob ¶
type ReplicationJob struct { Change ChangeType `json:"change"` RelativePath string `json:"relative_path"` TargetNodeStorage string `json:"target_node_storage"` SourceNodeStorage string `json:"source_node_storage"` VirtualStorage string `json:"virtual_storage"` Params Params `json:"params"` }
ReplicationJob is a persistent representation of the replication job.
func (*ReplicationJob) Scan ¶
func (job *ReplicationJob) Scan(value interface{}) error
type RepositoryExistsError ¶
type RepositoryExistsError struct {
// contains filtered or unexported fields
}
RepositoryExistsError is returned when trying to create a repository that already exists.
func (RepositoryExistsError) Error ¶
func (err RepositoryExistsError) Error() string
Error returns the errors message.
func (RepositoryExistsError) Is ¶
func (err RepositoryExistsError) Is(other error) bool
Is checks whether the other errors is of the same type.
type RepositoryNotExistsError ¶
type RepositoryNotExistsError struct {
// contains filtered or unexported fields
}
RepositoryNotExistsError is returned when trying to perform an operation on a non-existent repository.
func (RepositoryNotExistsError) Error ¶
func (err RepositoryNotExistsError) Error() string
Error returns the errors message.
func (RepositoryNotExistsError) Is ¶
func (err RepositoryNotExistsError) Is(other error) bool
Is checks whether the other errors is of the same type.
type RepositoryStore ¶
type RepositoryStore interface { // GetGeneration gets the repository's generation on a given storage. GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) // IncrementGeneration increments the primary's and the up to date secondaries' generations. IncrementGeneration(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error // SetGeneration sets the repository's generation on the given storage. If the generation is higher // than the virtual storage's generation, it is set to match as well to guarantee monotonic increments. SetGeneration(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error // GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would // downgrade, a DowngradeAttemptedError is returned. GetReplicatedGeneration(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) // CreateRepository creates a record for a repository in the specified virtual storage and relative path. // Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated // and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed // the transaction. Returns RepositoryExistsError when trying to create a repository which already exists in the store. // // storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as // the repository's primary. // // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error // DeleteRepository deletes the repository from the virtual storage and the storage. Returns // RepositoryNotExistsError when trying to delete a repository which has no record in the virtual storage // or the storage. DeleteRepository(ctx context.Context, virtualStorage, relativePath, storage string) error // DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage. DeleteReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well // as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository // which has no record in the virtual storage or the storage. RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error ConsistentStoragesGetter // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) // GetPartiallyReplicatedRepositories returns information on repositories which have an outdated copy on an assigned storage. // By default, repository specific primaries are returned in the results. If useVirtualStoragePrimaries is set, virtual storage's // primary is returned instead for each repository. GetPartiallyReplicatedRepositories(ctx context.Context, virtualStorage string, virtualStoragePrimaries bool) ([]OutdatedRepository, error) // DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's // record of the invalid repository. If the storage was the only storage with the repository, the repository's // record on the virtual storage is also deleted. DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error }
RepositoryStore provides access to repository state.
type RepositoryStoreCollector ¶
type RepositoryStoreCollector struct {
// contains filtered or unexported fields
}
RepositoryStoreCollector collects metrics from the RepositoryStore.
func NewRepositoryStoreCollector ¶
func NewRepositoryStoreCollector(log logrus.FieldLogger, virtualStorages []string, db glsql.Querier, repositoryScoped bool) *RepositoryStoreCollector
NewRepositoryStoreCollector returns a new collector.
func (*RepositoryStoreCollector) Collect ¶
func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric)
func (*RepositoryStoreCollector) Describe ¶
func (c *RepositoryStoreCollector) Describe(ch chan<- *prometheus.Desc)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
|
Package advisorylock contains the lock IDs of all advisory locks used in Praefect. |
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
|
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries. |