praefect

package
v16.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2024 License: MIT Imports: 73 Imported by: 0

Documentation

Overview

Package praefect is a Gitaly reverse proxy for transparently routing gRPC calls to a set of Gitaly services.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoHealthyNodes = errors.New("no healthy nodes")

ErrNoHealthyNodes is returned when there are no healthy nodes to serve a request.

View Source
var ErrNoSuitableNode = errors.New("no suitable node to serve the request")

ErrNoSuitableNode is returned when there is not suitable node to serve a request.

View Source
var ErrRepositoryReadOnly = structerr.NewFailedPrecondition("repository is in read-only mode")

ErrRepositoryReadOnly is returned when the repository is in read-only mode. This happens if the primary does not have the latest changes.

Functions

func DeleteObjectPoolHandler

func DeleteObjectPoolHandler(rs datastore.RepositoryStore, logger log.Logger, conns Connections) grpc.StreamHandler

DeleteObjectPoolHandler intercepts DeleteObjectPool calls, deletes the database records and deletes the object pool from every backing Gitaly node.

func GetObjectPoolHandler added in v16.2.0

func GetObjectPoolHandler(repoStore datastore.RepositoryStore, router Router) grpc.StreamHandler

GetObjectPoolHandler intercepts GetObjectPool RPC calls and rewrites replica path and storage responses to reflect Praefect state of the object pool.

func NewBackchannelServerFactory

func NewBackchannelServerFactory(logger log.Logger, refSvc gitalypb.RefTransactionServer, registry *sidechannel.Registry) backchannel.ServerFactory

NewBackchannelServerFactory returns a ServerFactory that serves the RefTransactionServer on the backchannel connection.

func NewGRPCServer

func NewGRPCServer(
	deps *Dependencies,
	creds credentials.TransportCredentials,
	opts ...ServerOption,
) *grpc.Server

NewGRPCServer returns gRPC server with registered proxy-handler and actual services praefect serves on its own. It includes a set of unary and stream interceptors required to add logging, authentication, etc.

func RemoveAllHandler

func RemoveAllHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler

RemoveAllHandler intercepts RemoveAll calls, deletes the database records before calling each gitaly.

func RemoveRepositoryHandler

func RemoveRepositoryHandler(rs datastore.RepositoryStore, logger log.Logger, conns Connections) grpc.StreamHandler

RemoveRepositoryHandler intercepts RemoveRepository calls, deletes the database records and deletes the repository from every backing Gitaly node.

func ReplicateRepositoryHandler added in v16.3.0

func ReplicateRepositoryHandler(coordinator *Coordinator) grpc.StreamHandler

ReplicateRepositoryHandler intercepts ReplicateRepository RPC calls.

func RepositoryExistsHandler

func RepositoryExistsHandler(rs datastore.RepositoryStore) grpc.StreamHandler

RepositoryExistsHandler handles /gitaly.RepositoryService/RepositoryExists calls by checking whether there is a record of the repository in the database.

func RunPraefectServer

func RunPraefectServer(
	tb testing.TB,
	ctx context.Context,
	conf config.Config,
	opt BuildOptions,
) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup)

RunPraefectServer starts praefect service based on the passed in configuration and options. The caller is responsible to call returned testhelper.Cleanup in order to stop the service and release all acquired resources. The function should be used only for testing purposes and not as part of the production code.

func WalkReposHandler added in v16.9.0

func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler

WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling through Praefect. Instead of walking the storage directory in the filesystem, this Praefect implementation queries the database for all known repositories in the given virtual storage. As a consequence, the modification_time parameter can't be populated in the response.

func WithDelayMetric

func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr)

WithDelayMetric is an option to set the delay prometheus metric

func WithDequeueBatchSize

func WithDequeueBatchSize(size uint) func(*ReplMgr)

WithDequeueBatchSize configures the number of events to dequeue in a single batch.

func WithLatencyMetric

func WithLatencyMetric(h prommetrics.HistogramVec) func(*ReplMgr)

WithLatencyMetric is an option to set the latency prometheus metric

func WithMockBackends

func WithMockBackends(tb testing.TB, backendRegistrars map[string]func(*grpc.Server)) func([]*config.VirtualStorage) []testhelper.Cleanup

WithMockBackends mocks backends with a set of passed in functions that know to register a gRPC server.

func WithParallelStorageProcessingWorkers

func WithParallelStorageProcessingWorkers(n uint) func(*ReplMgr)

WithParallelStorageProcessingWorkers configures the number of workers used to process replication events per virtual storage.

Types

type AssignmentGetter

type AssignmentGetter interface {
	// GetHostAssignments returns the names of the storages assigned to host the repository.
	// The primary node must always be assigned.
	GetHostAssignments(ctx context.Context, virtualStorage string, repositoryID int64) ([]string, error)
}

AssignmentGetter is an interface for getting repository host node assignments.

type AssignmentStore

type AssignmentStore interface {
	AssignmentGetter
	// SetReplicationFactor sets a repository's replication factor to the desired value and returns the
	// resulting assignments.
	SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
}

AssignmentStore is the interface which Praefect uses to operate on repository assignments.

func NewDisabledAssignmentStore

func NewDisabledAssignmentStore(storages map[string][]string) AssignmentStore

NewDisabledAssignmentStore returns an assignments store that can be used if no database is configured. It returns every configured storage as assigned and errors when trying to set assignments.

type Backoff

type Backoff func() time.Duration

Backoff returns next backoff.

type BackoffFactory

type BackoffFactory interface {
	// Create return new backoff provider and a reset function for it.
	Create() (Backoff, BackoffReset)
}

BackoffFactory creates backoff function and a reset pair for it.

type BackoffReset

type BackoffReset func()

BackoffReset resets backoff provider.

type BuildOptions

type BuildOptions struct {
	// WithQueue sets an implementation of the replication queue to use by praefect service.
	WithQueue datastore.ReplicationEventQueue
	// WithTxMgr sets the transaction manager to use by praefect service.
	WithTxMgr *transactions.Manager
	// WithBackends sets a callback that is triggered during initialization.
	WithBackends func([]*config.VirtualStorage) []testhelper.Cleanup
	// WithAnnotations sets a proto-registry to use by praefect service.
	WithAnnotations *protoregistry.Registry
	// WithLogger sets a logger to use by praefect service.
	WithLogger log.Logger
	// WithNodeMgr sets an implementation of the node manager to use by praefect service.
	WithNodeMgr nodes.Manager
	// WithRepoStore sets an implementation of the repositories store to use by praefect service.
	WithRepoStore datastore.RepositoryStore
	// WithAssignmentStore sets an implementation of the repositories store to use by praefect service.
	WithAssignmentStore AssignmentStore
	// WithConnections sets a set of connections to gitalies.
	WithConnections Connections
	// WithPrimaryGetter sets an implementation of the primary node getter to use by praefect service.
	WithPrimaryGetter PrimaryGetter
	// WithRouter sets an implementation of the request router to use by praefect service.
	WithRouter Router
	// WithChecks sets a list of check to run when ReadinessCheck RPC is called.
	WithChecks []service.CheckFunc
}

BuildOptions is a set of configurations options that can be set to configure praefect service.

type Connections

type Connections map[string]map[string]*grpc.ClientConn

Connections is a set of connections to configured storage nodes by their virtual storages.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator takes care of directing client requests to the appropriate downstream server. The coordinator is thread safe; concurrent calls to register nodes are safe.

func NewCoordinator

NewCoordinator returns a new Coordinator that utilizes the provided logger

func (*Coordinator) Collect

func (c *Coordinator) Collect(metrics chan<- prometheus.Metric)

func (*Coordinator) Describe

func (c *Coordinator) Describe(descs chan<- *prometheus.Desc)

func (*Coordinator) StreamDirector

func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error)

StreamDirector determines which downstream servers receive requests

type Dependencies added in v16.3.0

type Dependencies struct {
	Config          config.Config
	Logger          log.Logger
	Coordinator     *Coordinator
	Director        proxy.StreamDirector
	NodeMgr         nodes.Manager
	TxMgr           *transactions.Manager
	Queue           datastore.ReplicationEventQueue
	RepositoryStore datastore.RepositoryStore
	AssignmentStore AssignmentStore
	Router          Router
	Registry        *protoregistry.Registry
	Conns           Connections
	PrimaryGetter   PrimaryGetter
	Checks          []service.CheckFunc
}

Dependencies consolidates Praefect service dependencies for injection.

type ExpBackoffFactory

type ExpBackoffFactory struct {
	Start, Max time.Duration
}

ExpBackoffFactory creates exponentially growing durations.

func (ExpBackoffFactory) Create

func (b ExpBackoffFactory) Create() (Backoff, BackoffReset)

Create returns a backoff function based on Start and Max time durations.

type HealthChecker

type HealthChecker interface {
	// HealthyNodes gets a list of healthy storages by their virtual storage.
	HealthyNodes() map[string][]string
}

HealthChecker manages information of locally healthy nodes.

type MetadataVerifier

type MetadataVerifier struct {
	// contains filtered or unexported fields
}

MetadataVerifier verifies the repository metadata against the actual replicas on the Gitaly nodes. It queries the database for replicas that haven't been verified in a given time and checks whether the Gitalys still have them. If a Gitaly doesn't have a replica, the replica's metadata record is removed and the removal logged. The repository's record is still left in place even if all of the replicas are lost to ensure the data loss doesn't go unnoticed.

func NewMetadataVerifier

func NewMetadataVerifier(
	log log.Logger,
	db glsql.Querier,
	conns Connections,
	healthChecker HealthChecker,
	verificationInterval time.Duration,
	performDeletions bool,
) *MetadataVerifier

NewMetadataVerifier creates a new MetadataVerifier.

func (*MetadataVerifier) Collect

func (v *MetadataVerifier) Collect(ch chan<- prometheus.Metric)

Collect collects the metrics exposed from the MetadataVerifier.

func (*MetadataVerifier) Describe

func (v *MetadataVerifier) Describe(ch chan<- *prometheus.Desc)

Describe describes the collected metrics to Prometheus.

func (*MetadataVerifier) Run

func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error

Run runs the metadata verifier. It keeps running until the context is canceled.

func (*MetadataVerifier) RunExpiredLeaseReleaser

func (v *MetadataVerifier) RunExpiredLeaseReleaser(ctx context.Context, ticker helper.Ticker) error

RunExpiredLeaseReleaser releases expired leases on every tick. It keeps running until the context is canceled.

type Node

type Node struct {
	// Storage is the name of the storage node.
	Storage string
	// Address is the address of the node.
	Address string
	// Token is the authentication token of the node.
	Token string
	// Connection is a gRPC connection to the storage node.
	Connection *grpc.ClientConn
}

Node is a storage node in a virtual storage.

type NodeSet

type NodeSet map[string]map[string]Node

NodeSet contains nodes by their virtual storage and storage names.

func DialNodes

func DialNodes(
	ctx context.Context,
	virtualStorages []*config.VirtualStorage,
	registry *protoregistry.Registry,
	errorTracker tracker.ErrorTracker,
	handshaker client.Handshaker,
	sidechannelRegistry *sidechannel.Registry,
	log log.Logger,
) (NodeSet, error)

DialNodes dials the configured storage nodes.

func NodeSetFromNodeManager

func NodeSetFromNodeManager(mgr nodes.Manager) NodeSet

NodeSetFromNodeManager converts connections set up by the node manager in to a NodeSet. This is a temporary adapter required due to cyclic imports between the praefect and nodes packages.

func (NodeSet) Close

func (set NodeSet) Close()

Close closes the connections in the NodeSet. Errors on closing are ignored.

func (NodeSet) Connections

func (set NodeSet) Connections() Connections

Connections is a convenience method to return the connections from the NodeSet.

func (NodeSet) HealthClients

func (set NodeSet) HealthClients() nodes.HealthClients

HealthClients is a convenience method to return the HealthClients from the NodeSet.

type PerRepositoryRouter

type PerRepositoryRouter struct {
	// contains filtered or unexported fields
}

PerRepositoryRouter implements a router that routes requests respecting per repository primary nodes.

func NewPerRepositoryRouter

func NewPerRepositoryRouter(
	conns Connections,
	pg PrimaryGetter,
	hc HealthChecker,
	rand Random,
	csg datastore.ConsistentStoragesGetter,
	ag AssignmentGetter,
	rs datastore.RepositoryStore,
	defaultReplicationFactors map[string]int,
) *PerRepositoryRouter

NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration.

func (*PerRepositoryRouter) RouteRepositoryAccessor

func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)

func (*PerRepositoryRouter) RouteRepositoryCreation

func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error)

RouteRepositoryCreation routes an incoming repository creation to a set of target nodes that will be designated to hold the new repository.

func (*PerRepositoryRouter) RouteRepositoryMaintenance

func (r *PerRepositoryRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)

RouteRepositoryMaintenance will route the maintenance call to all healthy nodes in a best-effort strategy. We do not raise an error in case the primary node is unhealthy, but will in case all nodes are unhealthy.

func (*PerRepositoryRouter) RouteRepositoryMutator

func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error)

func (*PerRepositoryRouter) RouteStorageAccessor

func (r *PerRepositoryRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error)

RouteStorageAccessor routes requests for storage-scoped accessor RPCs. The only storage scoped accessor RPC is RemoteService/FindRemoteRepository, which in turn executes a command without a repository. This can be done by any Gitaly server as it doesn't depend on the state on the server.

func (*PerRepositoryRouter) RouteStorageMutator

func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)

RouteStorageMutator is not implemented here. The only storage scoped mutator RPC is related to namespace operations. These are not relevant anymore, given hashed storage is default everywhere, and should be eventually removed.

type PrimaryGetter

type PrimaryGetter interface {
	// GetPrimary returns the primary storage for a given repository.
	GetPrimary(ctx context.Context, virtualStorage string, repositoryID int64) (string, error)
}

PrimaryGetter is an interface for getting a primary of a repository.

type Random

type Random interface {
	// Intn returns a random integer in the range [0,n).
	Intn(n int) int
	// Shuffle pseudo-randomizes the order of elements. n is the number of elements.
	// Shuffle panics if n < 0. swap swaps the elements with indexes i and j.
	Shuffle(n int, swap func(i, j int))
}

Random is the interface of the Go random number generator.

func NewLockedRandom

func NewLockedRandom(r Random) Random

NewLockedRandom wraps the passed in Random to make it safe for concurrent use.

type ReplMgr

type ReplMgr struct {
	// contains filtered or unexported fields
}

ReplMgr is a replication manager for handling replication jobs

func NewReplMgr

func NewReplMgr(log log.Logger, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr

NewReplMgr initializes a replication manager with the provided dependencies and options

func (ReplMgr) Collect

func (r ReplMgr) Collect(ch chan<- prometheus.Metric)

func (ReplMgr) Describe

func (r ReplMgr) Describe(ch chan<- *prometheus.Desc)

func (ReplMgr) ProcessBacklog

func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory)

ProcessBacklog starts processing of queued jobs. It will be processing jobs until ctx is Done. ProcessBacklog blocks until all backlog processing goroutines have returned

func (ReplMgr) ProcessReplicationEvent

func (r ReplMgr) ProcessReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error

ProcessReplicationEvent processes a single replication event given the target client connection

func (ReplMgr) ProcessStale

func (r ReplMgr) ProcessStale(ctx context.Context, ticker helper.Ticker, staleAfter time.Duration) chan struct{}

ProcessStale starts a background process to acknowledge stale replication jobs. It will process jobs until ctx is Done.

type ReplMgrOpt

type ReplMgrOpt func(*ReplMgr)

ReplMgrOpt allows a replicator to be configured with additional options

type Replicator

type Replicator interface {
	// Replicate propagates changes from the source to the target
	Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error
	// Destroy will remove the target repo on the specified target connection
	Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}

Replicator performs the actual replication logic between two nodes

type RepositoryAccessorRoute

type RepositoryAccessorRoute struct {
	// ReplicaPath is the disk path where the replicas are stored.
	ReplicaPath string
	// Node contains the details of the node that should handle the request.
	Node RouterNode
}

RepositoryAccessorRoute describes how to route a repository scoped accessor call.

type RepositoryMaintenanceRoute

type RepositoryMaintenanceRoute struct {
	// RepositoryID is the repository's ID as Praefect identifies it.
	RepositoryID int64
	// ReplicaPath is the disk path where the replicas are stored.
	ReplicaPath string
	// Nodes contains all the nodes the call should be routed to.
	Nodes []RouterNode
}

RepositoryMaintenanceRoute describes how to route a repository scoped maintenance call.

type RepositoryMutatorRoute

type RepositoryMutatorRoute struct {
	// RepositoryID is the repository's ID as Praefect identifies it.
	RepositoryID int64
	// ReplicaPath is the disk path where the replicas are stored.
	ReplicaPath string
	// AdditionalReplicaPath is the disk path where the possible additional repository in the request
	// is stored. This is only used for object pools.
	AdditionalReplicaPath string
	// Primary is the primary node of the transaction.
	Primary RouterNode
	// Secondaries are the secondary participating in a transaction.
	Secondaries []RouterNode
	// ReplicationTargets are additional nodes that do not participate in a transaction
	// but need the changes replicated.
	ReplicationTargets []string
}

RepositoryMutatorRoute describes how to route a repository scoped mutator call.

type Router

type Router interface {
	// RouteStorageAccessor returns the node which should serve the storage accessor request.
	RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error)
	// RouteStorageMutator returns a route to primary and secondary nodes that should handle the
	// storage mutator request.
	RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)
	// RouteRepositoryAccessor returns the node that should serve the repository accessor
	// request. If forcePrimary is set to `true`, it returns the primary node.
	RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
	// RouteRepositoryMutator returns a route to primary and secondary nodes that should handle the
	// repository mutator request. Additionally, it returns nodes which do not participate in the
	// transaction, but to which the change should be replicated. RouteRepositoryMutator should only
	// be used with existing repositories.
	RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
	// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
	// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
	RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
	// RouteRepositoryMaintenance routes the given maintenance-style RPC to all nodes which
	// should perform maintenance. This would typically include all online nodes, regardless of
	// whether the repository hosted by them is up-to-date or not. Maintenance tasks should
	// never be replicated.
	RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)
}

Router decides which nodes to direct accessor and mutator RPCs to.

func NewNodeManagerRouter

func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router

NewNodeManagerRouter returns a router that uses the NodeManager to make routing decisions.

type RouterNode

type RouterNode struct {
	// Storage is storage of the node.
	Storage string
	// Connection is the connection to the node.
	Connection *grpc.ClientConn
}

RouterNode is a subset of a node's configuration needed to perform request routing.

type ServerFactory

type ServerFactory struct {
	// contains filtered or unexported fields
}

ServerFactory is a factory of praefect grpc servers

func NewServerFactory

func NewServerFactory(
	deps *Dependencies,
	opts ...ServerOption,
) *ServerFactory

NewServerFactory returns factory object for initialization of praefect gRPC servers.

func (*ServerFactory) Create

func (s *ServerFactory) Create(secure bool) (*grpc.Server, error)

Create returns newly instantiated and initialized with interceptors instance of the gRPC server.

func (*ServerFactory) GracefulStop

func (s *ServerFactory) GracefulStop()

GracefulStop stops both the secure and insecure servers gracefully.

func (*ServerFactory) Serve

func (s *ServerFactory) Serve(l net.Listener, secure bool) error

Serve starts serving on the provided listener with newly created grpc.Server

func (*ServerFactory) Stop

func (s *ServerFactory) Stop()

Stop stops all servers created by the factory.

type ServerOption added in v16.1.0

type ServerOption func(cfg *serverConfig)

ServerOption is an option that can be passed to `NewGRPCServer()`.

func WithStreamInterceptor added in v16.1.0

func WithStreamInterceptor(interceptor grpc.StreamServerInterceptor) ServerOption

WithStreamInterceptor adds another interceptor that shall be executed for streaming RPC calls.

func WithUnaryInterceptor added in v16.1.0

func WithUnaryInterceptor(interceptor grpc.UnaryServerInterceptor) ServerOption

WithUnaryInterceptor adds another interceptor that shall be executed for unary RPC calls.

type StaticHealthChecker

type StaticHealthChecker map[string][]string

StaticHealthChecker returns the nodes as always healthy.

func (StaticHealthChecker) HealthyNodes

func (healthyNodes StaticHealthChecker) HealthyNodes() map[string][]string

type StorageMutatorRoute

type StorageMutatorRoute struct {
	// Primary is the primary node of the routing decision.
	Primary RouterNode
	// Secondaries are the secondary nodes of the routing decision.
	Secondaries []RouterNode
}

StorageMutatorRoute describes how to route a storage scoped mutator call.

Directories

Path Synopsis
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
advisorylock
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
glsql
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL