datastore

package
v14.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.

See original design discussion: https://gitlab.com/gitlab-org/gitaly/issues/1495

Index

Constants

View Source
const (
	// JobStateReady indicates the job is now ready to proceed.
	JobStateReady = JobState("ready")
	// JobStateInProgress indicates the job is being processed by a worker.
	JobStateInProgress = JobState("in_progress")
	// JobStateCompleted indicates the job is now complete.
	JobStateCompleted = JobState("completed")
	// JobStateFailed indicates the job did not succeed. The Replicator will retry
	// failed jobs.
	JobStateFailed = JobState("failed")
	// JobStateDead indicates the job was retried up to the maximum retries.
	JobStateDead = JobState("dead")
)
View Source
const (
	// UpdateRepo is when a replication updates a repository in place
	UpdateRepo = ChangeType("update")
	// CreateRepo is when a replication creates a repo
	CreateRepo = ChangeType("create")
	// DeleteRepo is when a replication deletes a repo
	DeleteRepo = ChangeType("delete")
	// DeleteReplica change type indicates that the targeted replica is due for deletion.
	DeleteReplica = ChangeType("delete_replica")
	// RenameRepo is when a replication renames repo
	RenameRepo = ChangeType("rename")
	// GarbageCollect is when replication runs gc
	GarbageCollect = ChangeType("gc")
	// RepackFull is when replication runs a full repack
	RepackFull = ChangeType("repack_full")
	// RepackIncremental is when replication runs an incremental repack
	RepackIncremental = ChangeType("repack_incremental")
	// Cleanup is when replication runs a repo cleanup
	Cleanup = ChangeType("cleanup")
	// PackRefs is when replication optimizes references in a repo
	PackRefs = ChangeType("pack_refs")
	// WriteCommitGraph is when replication writes a commit graph
	WriteCommitGraph = ChangeType("write_commit_graph")
	// MidxRepack is when replication does a multi-pack-index repack
	MidxRepack = ChangeType("midx_repack")
	// OptimizeRepository is when replication optimizes a repository
	OptimizeRepository = ChangeType("optimize_repository")
	// PruneUnreachableObjects is when replication prunes unreachable objects in a repository
	PruneUnreachableObjects = ChangeType("prune_unreachable_objects")
)
View Source
const (
	// StorageRepositoriesUpdatesChannel is a name of the database event channel
	// used to send events with changes done to 'storage_repositories' table.
	StorageRepositoriesUpdatesChannel = "storage_repositories_updates"
	// RepositoriesUpdatesChannel is a name of the database event channel
	// used to send events with changes done to 'repositories' table.
	RepositoriesUpdatesChannel = "repositories_updates"
)
View Source
const GenerationUnknown = -1

GenerationUnknown is used to indicate lack of generation number in a replication job. Older instances can produce replication jobs without a generation number.

Variables

View Source
var ErrNoRowsAffected = errors.New("no rows were affected by the query")

ErrNoRowsAffected is returned when a query did not perform any changes.

Functions

func CheckPostgresVersion

func CheckPostgresVersion(db *sql.DB) error

CheckPostgresVersion checks the server version of the Postgres DB specified in conf. This is a diagnostic for the Praefect Postgres rollout. https://gitlab.com/gitlab-org/gitaly/issues/1755

func CountUnavailableRepositories added in v14.7.0

func CountUnavailableRepositories(ctx context.Context, db glsql.Querier, virtualStorages []string) (map[string]int, error)

CountUnavailableRepositories queries the number of unavailable repositories from the database. A repository is unavailable when it has no replicas that can act as a primary, indicating they are either unhealthy or out of date.

func MigrateDown

func MigrateDown(conf config.Config, max int) (int, error)

MigrateDown rolls back at most max migrations.

func MigrateDownPlan

func MigrateDownPlan(conf config.Config, max int) ([]string, error)

MigrateDownPlan does a dry run for rolling back at most max migrations.

func MigrateStatus

func MigrateStatus(conf config.Config) (map[string]*MigrationStatusRow, error)

MigrateStatus returns the status of database migrations. The key of the map indexes the migration ID.

Types

type AcknowledgeParams added in v14.4.0

type AcknowledgeParams struct {
	State JobState
	IDs   []uint64
}

AcknowledgeParams is the list of parameters used for Acknowledge method call.

type AssignmentStore

type AssignmentStore struct {
	// contains filtered or unexported fields
}

AssignmentStore manages host assignments in Postgres.

func NewAssignmentStore

func NewAssignmentStore(db glsql.Querier, configuredStorages map[string][]string) AssignmentStore

NewAssignmentStore returns a new AssignmentStore using the passed in database.

func (AssignmentStore) GetHostAssignments

func (s AssignmentStore) GetHostAssignments(ctx context.Context, virtualStorage string, repositoryID int64) ([]string, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (AssignmentStore) SetReplicationFactor

func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)

SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met. Please see the protobuf documentation of the method for details.

type CachingConsistentStoragesGetter

type CachingConsistentStoragesGetter struct {
	// contains filtered or unexported fields
}

CachingConsistentStoragesGetter is a ConsistentStoragesGetter that caches up to date storages by repository. Each virtual storage has it's own cache that invalidates entries based on notifications.

func NewCachingConsistentStoragesGetter

func NewCachingConsistentStoragesGetter(logger logrus.FieldLogger, csg ConsistentStoragesGetter, virtualStorages []string) (*CachingConsistentStoragesGetter, error)

NewCachingConsistentStoragesGetter returns a ConsistentStoragesGetter that uses caching.

func (*CachingConsistentStoragesGetter) Collect

func (c *CachingConsistentStoragesGetter) Collect(collector chan<- prometheus.Metric)

Collect collects all metrics.

func (*CachingConsistentStoragesGetter) Connected

func (c *CachingConsistentStoragesGetter) Connected()

Connected enables the cache when it has been connected to Postgres.

func (*CachingConsistentStoragesGetter) Describe

func (c *CachingConsistentStoragesGetter) Describe(descs chan<- *prometheus.Desc)

Describe returns all metric descriptors.

func (*CachingConsistentStoragesGetter) Disconnect

func (c *CachingConsistentStoragesGetter) Disconnect(error)

Disconnect disables the caching when connection to Postgres has been lost.

func (*CachingConsistentStoragesGetter) GetConsistentStorages

func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)

GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.

func (*CachingConsistentStoragesGetter) Notification

Notification handles notifications by invalidating cache entries of updated repositories.

type ChangeType

type ChangeType string

ChangeType indicates what kind of change the replication is propagating

func (ChangeType) String

func (ct ChangeType) String() string

type ClusterPath added in v14.0.12

type ClusterPath struct {
	// VirtualStorage is the name of the virtual storage.
	VirtualStorage string
	// Storage is the name of the gitaly storage.
	Storage string
}

ClusterPath represents path on the cluster to the storage.

type ConsistentStoragesGetter

type ConsistentStoragesGetter interface {
	// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
	GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
}

ConsistentStoragesGetter returns storages which contain the latest generation of a repository.

type DequeParams added in v14.4.0

type DequeParams struct {
	VirtualStorage, NodeStorage string
	Count                       int
}

DequeParams is the list of parameters used for Dequeue method call.

type DowngradeAttemptedError

type DowngradeAttemptedError struct {
	Storage             string
	CurrentGeneration   int
	AttemptedGeneration int
}

DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository that does not upgrade the target repository.

func (DowngradeAttemptedError) Error

func (err DowngradeAttemptedError) Error() string

type InvalidArgumentError

type InvalidArgumentError struct {
	// contains filtered or unexported fields
}

InvalidArgumentError tags the error as being caused by an invalid argument.

type JobState

type JobState string

JobState is an enum that indicates the state of a job

func (JobState) String

func (js JobState) String() string

type Listener added in v14.7.0

type Listener struct {
	// contains filtered or unexported fields
}

Listener is designed to listen for PostgreSQL database NOTIFY events. It connects to the database with Listen method and starts to listen for events.

func NewListener added in v14.7.0

func NewListener(conf config.DB) (*Listener, error)

NewListener returns a listener that is ready to listen for PostgreSQL notifications.

func (*Listener) Listen added in v14.7.0

func (l *Listener) Listen(ctx context.Context, handler glsql.ListenHandler, channels ...string) error

Listen starts listening for the events. Each event is passed to the handler for processing. Listen is a blocking call, it returns in case context is cancelled or an error occurs while receiving notifications from the database.

type MigrationStatusRow

type MigrationStatusRow struct {
	Migrated  bool
	Unknown   bool
	AppliedAt time.Time
}

MigrationStatusRow represents an entry in the schema migrations table. If the migration is in the database but is not listed, Unknown will be true.

type MockReplicationEventQueue

type MockReplicationEventQueue struct {
	ReplicationEventQueue
	EnqueueFunc func(context.Context, ReplicationEvent) (ReplicationEvent, error)
}

MockReplicationEventQueue is a helper for tests that implements ReplicationEventQueue and allows for parametrizing behavior.

func (*MockReplicationEventQueue) Enqueue

nolint: revive,stylecheck // This is unintentionally missing documentation.

type MockRepositoryStore

type MockRepositoryStore struct {
	GetGenerationFunc                       func(ctx context.Context, repositoryID int64, storage string) (int, error)
	IncrementGenerationFunc                 func(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
	GetReplicatedGenerationFunc             func(ctx context.Context, repositoryID int64, source, target string) (int, error)
	SetGenerationFunc                       func(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error
	CreateRepositoryFunc                    func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
	SetAuthoritativeReplicaFunc             func(ctx context.Context, virtualStorage, relativePath, storage string) error
	DeleteRepositoryFunc                    func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
	DeleteReplicaFunc                       func(ctx context.Context, repositoryID int64, storage string) error
	RenameRepositoryFunc                    func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
	GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
	GetConsistentStoragesFunc               func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
	GetPartiallyAvailableRepositoriesFunc   func(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
	DeleteInvalidRepositoryFunc             func(ctx context.Context, repositoryID int64, storage string) error
	RepositoryExistsFunc                    func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
	ReserveRepositoryIDFunc                 func(ctx context.Context, virtualStorage, relativePath string) (int64, error)
	GetRepositoryIDFunc                     func(ctx context.Context, virtualStorage, relativePath string) (int64, error)
	GetReplicaPathFunc                      func(ctx context.Context, repositoryID int64) (string, error)
	GetRepositoryMetadataFunc               func(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
	GetRepositoryMetadataByPathFunc         func(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
}

MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods default to what could be considered success if not set.

func (MockRepositoryStore) CreateRepository

func (m MockRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error

CreateRepository calls the mocked function. If no mock has been provided, it returns a nil error.

func (MockRepositoryStore) DeleteInvalidRepository

func (m MockRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (MockRepositoryStore) DeleteReplica

func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error

DeleteReplica runs the mock's DeleteReplicaFunc.

func (MockRepositoryStore) DeleteRepository

func (m MockRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (MockRepositoryStore) GetConsistentStorages

func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)

GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map.

func (MockRepositoryStore) GetConsistentStoragesByRepositoryID added in v14.5.0

func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)

GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map.

func (MockRepositoryStore) GetGeneration

func (m MockRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (MockRepositoryStore) GetPartiallyAvailableRepositories added in v14.1.0

func (m MockRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)

GetPartiallyAvailableRepositories returns the result of GetPartiallyAvailableRepositories or nil if it is unset.

func (MockRepositoryStore) GetReplicaPath added in v14.5.0

func (m MockRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)

GetReplicaPath returns the result of GetReplicaPathFunc or panics if it is unset.

func (MockRepositoryStore) GetReplicatedGeneration

func (m MockRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (MockRepositoryStore) GetRepositoryID added in v14.3.0

func (m MockRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)

GetRepositoryID returns the result of GetRepositoryIDFunc or 0 if it is unset.

func (MockRepositoryStore) GetRepositoryMetadata added in v14.6.0

func (m MockRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)

GetRepositoryMetadata returns the result of GetRepositoryMetadataFunc or panics if it is unset.

func (MockRepositoryStore) GetRepositoryMetadataByPath added in v14.6.0

func (m MockRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)

GetRepositoryMetadataByPath returns the result of GetRepositoryMetadataByPathFunc or panics if it is unset.

func (MockRepositoryStore) IncrementGeneration

func (m MockRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (MockRepositoryStore) RenameRepository

func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (MockRepositoryStore) RepositoryExists

func (m MockRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (MockRepositoryStore) ReserveRepositoryID added in v14.3.0

func (m MockRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)

ReserveRepositoryID returns the result of ReserveRepositoryIDFunc or 0 if it is unset.

func (MockRepositoryStore) SetAuthoritativeReplica added in v14.2.0

func (m MockRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error

SetAuthoritativeReplica calls the mocked function. If no mock has been provided, it returns a nil error.

func (MockRepositoryStore) SetGeneration

func (m MockRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

type Params

type Params map[string]interface{}

Params represent additional information required to process event after fetching it from storage. It must be JSON encodable/decodable to persist it without problems.

func (Params) GetBool added in v14.1.0

func (p Params) GetBool(key string) (bool, error)

GetBool returns the boolean parameter associated with the given key. Returns an error if either the key does not exist, or if the value is not a bool.

func (*Params) Scan

func (p *Params) Scan(value interface{}) error

Scan assigns a value from a database driver.

func (Params) Value

func (p Params) Value() (driver.Value, error)

Value returns a driver Value.

type PostgresReplicationEventQueue

type PostgresReplicationEventQueue struct {
	// contains filtered or unexported fields
}

PostgresReplicationEventQueue is a Postgres implementation of persistent queue.

func NewPostgresReplicationEventQueue

func NewPostgresReplicationEventQueue(qc glsql.Querier) PostgresReplicationEventQueue

NewPostgresReplicationEventQueue returns new instance with provided Querier as a reference to storage.

func (PostgresReplicationEventQueue) Acknowledge

func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (PostgresReplicationEventQueue) AcknowledgeStale

func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error)

AcknowledgeStale moves replication events that are 'in_progress' state for too long (more then staleAfter) into the next state:

'failed' - in case it has more attempts to be executed
'dead' - in case it has no more attempts to be executed

The job considered 'in_progress' if it has corresponding entry in the 'replication_queue_job_lock' table. When moving from 'in_progress' to other state the entry from 'replication_queue_job_lock' table will be removed and entry in the 'replication_queue_lock' will be updated if needed (release of the lock).

func (PostgresReplicationEventQueue) Dequeue

func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (PostgresReplicationEventQueue) Enqueue

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (PostgresReplicationEventQueue) StartHealthUpdate

func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error

StartHealthUpdate starts periodical update of the event's health identifier. The events with fresh health identifier won't be considered as stale. The health update will be executed on each new entry received from trigger channel passed in. It is a blocking call that is managed by the passed in context.

type PostgresRepositoryStore

type PostgresRepositoryStore struct {
	// contains filtered or unexported fields
}

PostgresRepositoryStore is a Postgres implementation of RepositoryStore. Refer to the interface for method documentation.

func NewPostgresRepositoryStore

func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string][]string) *PostgresRepositoryStore

NewPostgresRepositoryStore returns a Postgres implementation of RepositoryStore.

func (*PostgresRepositoryStore) CreateRepository

func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error

CreateRepository creates a record for a repository in the specified virtual storage and relative path. Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed the transaction. Returns RepositoryExistsError when trying to create a repository which already exists in the store.

storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as the repository's primary.

storeAssignments should be set when variable replication factor is enabled. When set, the primary and the secondaries are stored as the assigned hosts of the repository.

func (*PostgresRepositoryStore) DeleteInvalidRepository

func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error

DeleteInvalidRepository deletes the given replica. If the replica was the only replica of the repository, then the repository will be deleted, as well.

func (*PostgresRepositoryStore) DeleteReplica

func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error

DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.

func (*PostgresRepositoryStore) DeleteRepository

func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (*PostgresRepositoryStore) GetConsistentStorages

func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)

GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.

func (*PostgresRepositoryStore) GetConsistentStoragesByRepositoryID added in v14.5.0

func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)

GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.

func (*PostgresRepositoryStore) GetGeneration

func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (*PostgresRepositoryStore) GetPartiallyAvailableRepositories added in v14.1.0

func (rs *PostgresRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)

GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which are not able to serve requests at the moment.

func (*PostgresRepositoryStore) GetReplicaPath added in v14.5.0

func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)

GetReplicaPath gets the replica path of a repository. Returns a commonerr.ErrRepositoryNotFound if a record for the repository ID is not found.

func (*PostgresRepositoryStore) GetReplicatedGeneration

func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (*PostgresRepositoryStore) GetRepositoryID added in v14.3.0

func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)

GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a RepositoryNotFoundError if the repository doesn't exist.

func (*PostgresRepositoryStore) GetRepositoryMetadata added in v14.6.0

func (rs *PostgresRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)

GetRepositoryMetadata retrieves a repository's metadata.

func (*PostgresRepositoryStore) GetRepositoryMetadataByPath added in v14.6.0

func (rs *PostgresRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)

GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path.

func (*PostgresRepositoryStore) IncrementGeneration

func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (*PostgresRepositoryStore) RenameRepository

func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (*PostgresRepositoryStore) RepositoryExists

func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (*PostgresRepositoryStore) ReserveRepositoryID added in v14.3.0

func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)

ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already exists with the given virtual storage and relative path combination, an error is returned.

func (*PostgresRepositoryStore) SetAuthoritativeReplica added in v14.2.0

func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error

SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.

func (*PostgresRepositoryStore) SetGeneration

func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

type QueueDepthCollector added in v14.7.0

type QueueDepthCollector struct {
	// contains filtered or unexported fields
}

QueueDepthCollector collects metrics describing replication queue depths

func NewQueueDepthCollector added in v14.7.0

func NewQueueDepthCollector(log logrus.FieldLogger, db glsql.Querier, timeout time.Duration) *QueueDepthCollector

NewQueueDepthCollector returns a new QueueDepthCollector

func (*QueueDepthCollector) Collect added in v14.7.0

func (q *QueueDepthCollector) Collect(ch chan<- prometheus.Metric)

Collect collects metrics describing the replication queue depth

func (*QueueDepthCollector) Describe added in v14.7.0

func (q *QueueDepthCollector) Describe(ch chan<- *prometheus.Desc)

nolint: revive,stylecheck // This is unintentionally missing documentation.

type Replica added in v14.6.0

type Replica struct {
	// Storage is the name of the replica's storage.
	Storage string
	// Generation is the replica's confirmed generation. If the replica does not yet exists, generation
	// is -1.
	Generation int64
	// Assigned indicates whether the storage is an assigned host of the repository.
	Assigned bool
	// Healthy indicates whether the replica is considered healthy by the consensus of Praefect nodes.
	Healthy bool
	// ValidPrimary indicates whether the replica is ready to serve as the primary if necessary.
	ValidPrimary bool
	// VerifiedAt is the last successful verification time of the replica.
	VerifiedAt time.Time
}

Replica represents a replica of a repository.

type ReplicationEvent

type ReplicationEvent struct {
	ID        uint64
	State     JobState
	Attempt   int
	LockID    string
	CreatedAt time.Time
	UpdatedAt *time.Time
	Job       ReplicationJob
	Meta      Params
}

ReplicationEvent is a persistent representation of the replication event.

func (*ReplicationEvent) Mapping

func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error)

Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.

func (*ReplicationEvent) Scan

func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error

Scan fills receive fields with values fetched from database based on the set of columns/column aliases.

type ReplicationEventQueue

type ReplicationEventQueue interface {
	// Enqueue puts provided event into the persistent queue.
	Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
	// Dequeue retrieves events from the persistent queue using provided limitations and filters.
	Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
	// Acknowledge updates previously dequeued events with the new state and releases resources acquired for it.
	// It updates events that are in 'in_progress' state to the state that is passed in.
	// It also updates state of similar events (scheduled fot the same repository with same change from the same source)
	// that are in 'ready' state and created before the target event was dequeue for the processing if the new state is
	// 'completed'. Otherwise it won't be changed.
	// It returns sub-set of passed in ids that were updated.
	Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
	// StartHealthUpdate starts periodical update of the event's health identifier.
	// The events with fresh health identifier won't be considered as stale.
	// The health update will be executed on each new entry received from trigger channel passed in.
	// It is a blocking call that is managed by the passed in context.
	StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
	// AcknowledgeStale moves replication events that are 'in_progress' state for too long (more than staleAfter)
	// into the next state:
	//   'failed' - in case it has more attempts to be executed
	//   'dead' - in case it has no more attempts to be executed
	AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error)
}

ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.

func NewMemoryReplicationEventQueue

func NewMemoryReplicationEventQueue(conf config.Config) ReplicationEventQueue

NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue.

type ReplicationEventQueueInterceptor

type ReplicationEventQueueInterceptor struct {
	ReplicationEventQueue
	// contains filtered or unexported fields
}

ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface. It also provides additional methods to get info about incoming and outgoing data from the underling queue. NOTE: it should be used for testing purposes only as it persists data in memory and doesn't clean it up.

func NewReplicationEventQueueInterceptor

func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) *ReplicationEventQueueInterceptor

NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.

func (*ReplicationEventQueueInterceptor) Acknowledge added in v14.4.0

func (i *ReplicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)

Acknowledge intercepts call to the Acknowledge method of the underling implementation or a call back. It populates storage of incoming and outgoing parameters before and after method call.

func (*ReplicationEventQueueInterceptor) AcknowledgeStale added in v14.4.0

func (i *ReplicationEventQueueInterceptor) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) (int64, error)

AcknowledgeStale intercepts call to the AcknowledgeStale method of the underling implementation or a call back.

func (*ReplicationEventQueueInterceptor) Dequeue added in v14.4.0

func (i *ReplicationEventQueueInterceptor) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)

Dequeue intercepts call to the Dequeue method of the underling implementation or a call back. It populates storage of incoming and outgoing parameters before and after method call.

func (*ReplicationEventQueueInterceptor) Enqueue added in v14.4.0

Enqueue intercepts call to the Enqueue method of the underling implementation or a call back. It populates storage of incoming and outgoing parameters before and after method call.

func (*ReplicationEventQueueInterceptor) GetAcknowledge added in v14.4.0

GetAcknowledge returns a list of parameters used for Acknowledge method or a call-back invocation.

func (*ReplicationEventQueueInterceptor) GetAcknowledgeResult added in v14.4.0

func (i *ReplicationEventQueueInterceptor) GetAcknowledgeResult() [][]uint64

GetAcknowledgeResult returns a list of results returned after Acknowledge method or a call-back invocation.

func (*ReplicationEventQueueInterceptor) GetDequeued added in v14.4.0

func (i *ReplicationEventQueueInterceptor) GetDequeued() []DequeParams

GetDequeued returns a list of parameters used for Dequeue method or a call-back invocation.

func (*ReplicationEventQueueInterceptor) GetDequeuedResult added in v14.4.0

func (i *ReplicationEventQueueInterceptor) GetDequeuedResult() [][]ReplicationEvent

GetDequeuedResult returns a list of events returned after Dequeue method or a call-back invocation.

func (*ReplicationEventQueueInterceptor) GetEnqueued added in v14.4.0

GetEnqueued returns a list of events used for Enqueue method or a call-back invocation.

func (*ReplicationEventQueueInterceptor) GetEnqueuedResult added in v14.4.0

func (i *ReplicationEventQueueInterceptor) GetEnqueuedResult() []ReplicationEvent

GetEnqueuedResult returns a list of events returned by Enqueue method or a call-back invocation.

func (*ReplicationEventQueueInterceptor) OnAcknowledge

OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called.

func (*ReplicationEventQueueInterceptor) OnAcknowledgeStale

func (i *ReplicationEventQueueInterceptor) OnAcknowledgeStale(action func(context.Context, time.Duration) (int64, error))

OnAcknowledgeStale allows to set action that would be executed each time when `AcknowledgeStale` method called.

func (*ReplicationEventQueueInterceptor) OnDequeue

OnDequeue allows to set action that would be executed each time when `Dequeue` method called.

func (*ReplicationEventQueueInterceptor) OnEnqueue

OnEnqueue allows to set action that would be executed each time when `Enqueue` method called.

func (*ReplicationEventQueueInterceptor) OnStartHealthUpdate

func (i *ReplicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error)

OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called.

func (*ReplicationEventQueueInterceptor) StartHealthUpdate added in v14.4.0

func (i *ReplicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error

StartHealthUpdate intercepts call to the StartHealthUpdate method of the underling implementation or a call back.

func (*ReplicationEventQueueInterceptor) Wait added in v14.4.0

Wait checks the condition in a loop with await until it returns true or deadline is exceeded. The error is returned only in case the deadline is exceeded.

type ReplicationJob

type ReplicationJob struct {
	// RepositoryID is the ID of the repository this job relates to. RepositoryID
	// may be 0 if the job doesn't relate to any known repository. This can happen
	// for example when the job is deleting an orphaned replica of a deleted repository.
	RepositoryID int64 `json:"repository_id"`
	// ReplicaPath is the relative path where the replicas are stored in the Gitaly storages.
	ReplicaPath string     `json:"replica_path"`
	Change      ChangeType `json:"change"`
	// RelativePath is the virtual relative path the client uses to access the repository on the
	// virtual storage. The actual path that is used to store the repository on the disks is the
	// ReplicaPath. This can be removed in the future but is still carried in the jobs as the
	// replication queue locking depends on this.
	RelativePath      string `json:"relative_path"`
	TargetNodeStorage string `json:"target_node_storage"`
	SourceNodeStorage string `json:"source_node_storage"`
	VirtualStorage    string `json:"virtual_storage"`
	Params            Params `json:"params"`
}

ReplicationJob is a persistent representation of the replication job.

func (*ReplicationJob) Scan

func (job *ReplicationJob) Scan(value interface{}) error

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (ReplicationJob) Value

func (job ReplicationJob) Value() (driver.Value, error)

nolint: revive,stylecheck // This is unintentionally missing documentation.

type RepositoryExistsError

type RepositoryExistsError struct {
	// contains filtered or unexported fields
}

RepositoryExistsError is returned when trying to create a repository that already exists.

func (RepositoryExistsError) Error

func (err RepositoryExistsError) Error() string

Error returns the errors message.

func (RepositoryExistsError) Is

func (err RepositoryExistsError) Is(other error) bool

Is checks whether the other errors is of the same type.

type RepositoryMetadata added in v14.6.0

type RepositoryMetadata struct {
	// RepositoryID is the internal id of the repository.
	RepositoryID int64
	// VirtualStorage is the virtual storage where the repository is.
	VirtualStorage string
	// RelativePath is the relative path of the repository.
	RelativePath string
	// ReplicaPath is the actual disk location where the replicas are stored in the storages.
	ReplicaPath string
	// Primary is the current primary of this repository.
	Primary string
	// Generation is the current generation of the repository.
	Generation int64
	// Replicas contains information of the repository on each storage that contains the repository
	// or does not contain the repository but is assigned to host it.
	Replicas []Replica
}

RepositoryMetadata contains the repository's metadata.

type RepositoryNotExistsError

type RepositoryNotExistsError struct {
	// contains filtered or unexported fields
}

RepositoryNotExistsError is returned when trying to perform an operation on a non-existent repository.

func (RepositoryNotExistsError) Error

func (err RepositoryNotExistsError) Error() string

Error returns the errors message.

func (RepositoryNotExistsError) Is

func (err RepositoryNotExistsError) Is(other error) bool

Is checks whether the other errors is of the same type.

type RepositoryStore

type RepositoryStore interface {
	// GetGeneration gets the repository's generation on a given storage.
	GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)
	// IncrementGeneration increments the generations of up to date nodes.
	IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
	// SetGeneration sets the repository's generation on the given storage. If the generation is higher
	// than the virtual storage's generation, it is set to match as well to guarantee monotonic increments.
	SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error
	// GetReplicaPath gets the replica path of a repository. Returns a commonerr.ErrRepositoryNotFound if a record
	// for the repository ID is not found.
	GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
	// GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would
	// downgrade, a DowngradeAttemptedError is returned.
	GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)
	// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
	// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
	// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
	// the transaction. Returns RepositoryExistsError when trying to create a repository which already exists in the store.
	//
	// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as
	// the repository's primary.
	//
	// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
	// secondaries are stored as the assigned hosts of the repository.
	CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
	// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
	SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
	// DeleteRepository deletes the database records associated with the repository. It returns the replica path and the storages
	// which are known to have a replica at the time of deletion. commonerr.RepositoryNotFoundError is returned when
	// the repository is not tracked by the Praefect datastore.
	DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
	// DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage.
	DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
	// RenameRepository updates a repository's relative path. It renames the virtual storage wide record as well
	// as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository
	// which has no record in the virtual storage or the storage.
	RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
	// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
	GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
	ConsistentStoragesGetter
	// RepositoryExists returns whether the repository exists on a virtual storage.
	RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
	// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
	// are not able to serve requests at the moment.
	GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
	// DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's
	// record of the invalid repository. If the storage was the only storage with the repository, the repository's
	// record on the virtual storage is also deleted.
	DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error
	// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
	// exists with the given virtual storage and relative path combination, an error is returned.
	ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
	// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a
	// RepositoryNotFoundError if the repository doesn't exist.
	GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
	// GetRepositoryMetadata retrieves a repository's metadata.
	GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
	// GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path.
	GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
}

RepositoryStore provides access to repository state.

type RepositoryStoreCollector

type RepositoryStoreCollector struct {
	// contains filtered or unexported fields
}

RepositoryStoreCollector collects metrics from the RepositoryStore.

func NewRepositoryStoreCollector

func NewRepositoryStoreCollector(
	log logrus.FieldLogger,
	virtualStorages []string,
	db glsql.Querier,
	timeout time.Duration,
) *RepositoryStoreCollector

NewRepositoryStoreCollector returns a new collector.

func (*RepositoryStoreCollector) Collect

func (c *RepositoryStoreCollector) Collect(ch chan<- prometheus.Metric)

nolint: revive,stylecheck // This is unintentionally missing documentation.

func (*RepositoryStoreCollector) Describe

func (c *RepositoryStoreCollector) Describe(ch chan<- *prometheus.Desc)

nolint: revive,stylecheck // This is unintentionally missing documentation.

type ResilientListener added in v14.7.0

type ResilientListener struct {
	// contains filtered or unexported fields
}

ResilientListener allows listen for notifications resiliently.

func NewResilientListener added in v14.7.0

func NewResilientListener(conf config.DB, ticker helper.Ticker, logger logrus.FieldLogger) *ResilientListener

NewResilientListener returns instance of the *ResilientListener.

func (*ResilientListener) Collect added in v14.7.0

func (rl *ResilientListener) Collect(metrics chan<- promclient.Metric)

Collect returns set of metrics collected during execution.

func (*ResilientListener) Describe added in v14.7.0

func (rl *ResilientListener) Describe(descs chan<- *promclient.Desc)

Describe return description of the metric.

func (*ResilientListener) Listen added in v14.7.0

func (rl *ResilientListener) Listen(ctx context.Context, handler glsql.ListenHandler, channels ...string) error

Listen starts a new Listener and listens for the notifications on the channels. If error occurs and connection is closed/terminated another Listener is created after some await period. The method returns only when provided context is cancelled or invalid configuration is used.

type StorageCleanup added in v14.0.12

type StorageCleanup struct {
	// contains filtered or unexported fields
}

StorageCleanup provides methods on the database for the repository cleanup operation.

func NewStorageCleanup added in v14.0.12

func NewStorageCleanup(db *sql.DB) *StorageCleanup

NewStorageCleanup initialises and returns a new instance of the StorageCleanup.

func (*StorageCleanup) AcquireNextStorage added in v14.4.0

func (ss *StorageCleanup) AcquireNextStorage(ctx context.Context, inactive, updatePeriod time.Duration) (*ClusterPath, func() error, error)

AcquireNextStorage picks up the next storage for processing. Once acquired no other call to the same method will return the same storage, so it works as exclusive lock on that entry. Once processing is done the returned function needs to be called to release acquired storage. It updates last_run column of the entry on execution.

func (*StorageCleanup) DoesntExist added in v14.0.12

func (ss *StorageCleanup) DoesntExist(ctx context.Context, virtualStorage, storage string, replicaPaths []string) ([]string, error)

DoesntExist returns replica path for each repository that doesn't exist in the database by querying repositories and storage_repositories tables.

func (*StorageCleanup) Populate added in v14.4.0

func (ss *StorageCleanup) Populate(ctx context.Context, virtualStorage, storage string) error

Populate adds storage to the set, so it can be acquired afterwards.

Directories

Path Synopsis
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL