praefect

package
v15.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: MIT Imports: 78 Imported by: 0

Documentation

Overview

Package praefect is a Gitaly reverse proxy for transparently routing gRPC calls to a set of Gitaly services.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoHealthyNodes = errors.New("no healthy nodes")

ErrNoHealthyNodes is returned when there are no healthy nodes to serve a request.

View Source
var ErrNoSuitableNode = errors.New("no suitable node to serve the request")

ErrNoSuitableNode is returned when there is not suitable node to serve a request.

View Source
var ErrRepositoryReadOnly = helper.ErrFailedPreconditionf("repository is in read-only mode")

ErrRepositoryReadOnly is returned when the repository is in read-only mode. This happens if the primary does not have the latest changes.

Functions

func DeleteObjectPoolHandler

func DeleteObjectPoolHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler

DeleteObjectPoolHandler intercepts DeleteObjectPool calls, deletes the database records and deletes the object pool from every backing Gitaly node.

func GetBuildTime

func GetBuildTime() string

GetBuildTime returns the time at which the build took place

func GetVersion

func GetVersion() string

GetVersion returns the semver compatible version number

func GetVersionString

func GetVersionString() string

GetVersionString returns a standard version header

func NewBackchannelServerFactory

func NewBackchannelServerFactory(logger *logrus.Entry, refSvc gitalypb.RefTransactionServer, registry *sidechannel.Registry) backchannel.ServerFactory

NewBackchannelServerFactory returns a ServerFactory that serves the RefTransactionServer on the backchannel connection.

func NewGRPCServer

func NewGRPCServer(
	conf config.Config,
	logger *logrus.Entry,
	registry *protoregistry.Registry,
	director proxy.StreamDirector,
	txMgr *transactions.Manager,
	rs datastore.RepositoryStore,
	assignmentStore AssignmentStore,
	conns Connections,
	primaryGetter PrimaryGetter,
	creds credentials.TransportCredentials,
	checks []service.CheckFunc,
	grpcOpts ...grpc.ServerOption,
) *grpc.Server

NewGRPCServer returns gRPC server with registered proxy-handler and actual services praefect serves on its own. It includes a set of unary and stream interceptors required to add logging, authentication, etc.

func RemoveRepositoryHandler

func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler

RemoveRepositoryHandler intercepts RemoveRepository calls, deletes the database records and deletes the repository from every backing Gitaly node.

func RenameRepositoryFeatureFlagger

func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor

RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either. Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys. The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled later.

This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires peeking into the internals of the proxying so we can set restore the frame correctly.

func RenameRepositoryHandler

func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler

RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming the repository in the lookup table stored in the database.

func RepositoryExistsHandler

func RepositoryExistsHandler(rs datastore.RepositoryStore) grpc.StreamHandler

RepositoryExistsHandler handles /gitaly.RepositoryService/RepositoryExists calls by checking whether there is a record of the repository in the database.

func RunPraefectServer added in v15.3.0

func RunPraefectServer(
	tb testing.TB,
	ctx context.Context,
	conf config.Config,
	opt BuildOptions,
) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup)

RunPraefectServer starts praefect service based on the passed in configuration and options. The caller is responsible to call returned testhelper.Cleanup in order to stop the service and release all acquired resources. The function should be used only for testing purposes and not as part of the production code.

func WithDelayMetric

func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr)

WithDelayMetric is an option to set the delay prometheus metric

func WithDequeueBatchSize

func WithDequeueBatchSize(size uint) func(*ReplMgr)

WithDequeueBatchSize configures the number of events to dequeue in a single batch.

func WithLatencyMetric

func WithLatencyMetric(h prommetrics.HistogramVec) func(*ReplMgr)

WithLatencyMetric is an option to set the latency prometheus metric

func WithMockBackends added in v15.3.0

func WithMockBackends(tb testing.TB, backends map[string]mock.SimpleServiceServer) func([]*config.VirtualStorage) []testhelper.Cleanup

WithMockBackends mocks backends with a set of passed in stubs.

func WithParallelStorageProcessingWorkers

func WithParallelStorageProcessingWorkers(n uint) func(*ReplMgr)

WithParallelStorageProcessingWorkers configures the number of workers used to process replication events per virtual storage.

Types

type AssignmentGetter

type AssignmentGetter interface {
	// GetHostAssignments returns the names of the storages assigned to host the repository.
	// The primary node must always be assigned.
	GetHostAssignments(ctx context.Context, virtualStorage string, repositoryID int64) ([]string, error)
}

AssignmentGetter is an interface for getting repository host node assignments.

type AssignmentStore

type AssignmentStore interface {
	AssignmentGetter
	// SetReplicationFactor sets a repository's replication factor to the desired value and returns the
	// resulting assignments.
	SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
}

AssignmentStore is the interface which Praefect uses to operate on repository assignments.

func NewDisabledAssignmentStore

func NewDisabledAssignmentStore(storages map[string][]string) AssignmentStore

NewDisabledAssignmentStore returns an assignments store that can be used if no database is configured. It returns every configured storage as assigned and errors when trying to set assignments.

type Backoff

type Backoff func() time.Duration

Backoff returns next backoff.

type BackoffFactory

type BackoffFactory interface {
	// Create return new backoff provider and a reset function for it.
	Create() (Backoff, BackoffReset)
}

BackoffFactory creates backoff function and a reset pair for it.

type BackoffReset

type BackoffReset func()

BackoffReset resets backoff provider.

type BuildOptions added in v15.3.0

type BuildOptions struct {
	// WithQueue sets an implementation of the replication queue to use by praefect service.
	WithQueue datastore.ReplicationEventQueue
	// WithTxMgr sets the transaction manager to use by praefect service.
	WithTxMgr *transactions.Manager
	// WithBackends sets a callback that is triggered during initialization.
	WithBackends func([]*config.VirtualStorage) []testhelper.Cleanup
	// WithAnnotations sets a proto-registry to use by praefect service.
	WithAnnotations *protoregistry.Registry
	// WithLogger sets a logger to use by praefect service.
	WithLogger *logrus.Entry
	// WithNodeMgr sets an implementation of the node manager to use by praefect service.
	WithNodeMgr nodes.Manager
	// WithRepoStore sets an implementation of the repositories store to use by praefect service.
	WithRepoStore datastore.RepositoryStore
	// WithAssignmentStore sets an implementation of the repositories store to use by praefect service.
	WithAssignmentStore AssignmentStore
	// WithConnections sets a set of connections to gitalies.
	WithConnections Connections
	// WithPrimaryGetter sets an implementation of the primary node getter to use by praefect service.
	WithPrimaryGetter PrimaryGetter
	// WithRouter sets an implementation of the request router to use by praefect service.
	WithRouter Router
	// WithChecks sets a list of check to run when ReadinessCheck RPC is called.
	WithChecks []service.CheckFunc
}

BuildOptions is a set of configurations options that can be set to configure praefect service.

type Connections

type Connections map[string]map[string]*grpc.ClientConn

Connections is a set of connections to configured storage nodes by their virtual storages.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator takes care of directing client requests to the appropriate downstream server. The coordinator is thread safe; concurrent calls to register nodes are safe.

func NewCoordinator

NewCoordinator returns a new Coordinator that utilizes the provided logger

func (*Coordinator) Collect

func (c *Coordinator) Collect(metrics chan<- prometheus.Metric)

func (*Coordinator) Describe

func (c *Coordinator) Describe(descs chan<- *prometheus.Desc)

func (*Coordinator) StreamDirector

func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error)

StreamDirector determines which downstream servers receive requests

type ExpBackoffFactory

type ExpBackoffFactory struct {
	Start, Max time.Duration
}

ExpBackoffFactory creates exponentially growing durations.

func (ExpBackoffFactory) Create

func (b ExpBackoffFactory) Create() (Backoff, BackoffReset)

Create returns a backoff function based on Start and Max time durations.

type HealthChecker

type HealthChecker interface {
	// HealthyNodes gets a list of healthy storages by their virtual storage.
	HealthyNodes() map[string][]string
}

HealthChecker manages information of locally healthy nodes.

type MetadataVerifier

type MetadataVerifier struct {
	// contains filtered or unexported fields
}

MetadataVerifier verifies the repository metadata against the actual replicas on the Gitaly nodes. It queries the database for replicas that haven't been verified in a given time and checks whether the Gitalys still have them. If a Gitaly doesn't have a replica, the replica's metadata record is removed and the removal logged. The repository's record is still left in place even if all of the replicas are lost to ensure the data loss doesn't go unnoticed.

func NewMetadataVerifier

func NewMetadataVerifier(
	log logrus.FieldLogger,
	db glsql.Querier,
	conns Connections,
	healthChecker HealthChecker,
	verificationInterval time.Duration,
	performDeletions bool,
) *MetadataVerifier

NewMetadataVerifier creates a new MetadataVerifier.

func (*MetadataVerifier) Collect

func (v *MetadataVerifier) Collect(ch chan<- prometheus.Metric)

Collect collects the metrics exposed from the MetadataVerifier.

func (*MetadataVerifier) Describe

func (v *MetadataVerifier) Describe(ch chan<- *prometheus.Desc)

Describe describes the collected metrics to Prometheus.

func (*MetadataVerifier) Run

func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error

Run runs the metadata verifier. It keeps running until the context is canceled.

func (*MetadataVerifier) RunExpiredLeaseReleaser

func (v *MetadataVerifier) RunExpiredLeaseReleaser(ctx context.Context, ticker helper.Ticker) error

RunExpiredLeaseReleaser releases expired leases on every tick. It keeps running until the context is canceled.

type Node

type Node struct {
	// Storage is the name of the storage node.
	Storage string
	// Address is the address of the node.
	Address string
	// Token is the authentication token of the node.
	Token string
	// Connection is a gRPC connection to the storage node.
	Connection *grpc.ClientConn
}

Node is a storage node in a virtual storage.

type NodeSet

type NodeSet map[string]map[string]Node

NodeSet contains nodes by their virtual storage and storage names.

func DialNodes

func DialNodes(
	ctx context.Context,
	virtualStorages []*config.VirtualStorage,
	registry *protoregistry.Registry,
	errorTracker tracker.ErrorTracker,
	handshaker client.Handshaker,
	sidechannelRegistry *sidechannel.Registry,
) (NodeSet, error)

DialNodes dials the configured storage nodes.

func NodeSetFromNodeManager

func NodeSetFromNodeManager(mgr nodes.Manager) NodeSet

NodeSetFromNodeManager converts connections set up by the node manager in to a NodeSet. This is a temporary adapter required due to cyclic imports between the praefect and nodes packages.

func (NodeSet) Close

func (set NodeSet) Close()

Close closes the connections in the NodeSet. Errors on closing are ignored.

func (NodeSet) Connections

func (set NodeSet) Connections() Connections

Connections is a convenience method to return the connections from the NodeSet.

func (NodeSet) HealthClients

func (set NodeSet) HealthClients() nodes.HealthClients

HealthClients is a convenience method to return the HealthClients from the NodeSet.

type PerRepositoryRouter

type PerRepositoryRouter struct {
	// contains filtered or unexported fields
}

PerRepositoryRouter implements a router that routes requests respecting per repository primary nodes.

func NewPerRepositoryRouter

func NewPerRepositoryRouter(
	conns Connections,
	pg PrimaryGetter,
	hc HealthChecker,
	rand Random,
	csg datastore.ConsistentStoragesGetter,
	ag AssignmentGetter,
	rs datastore.RepositoryStore,
	defaultReplicationFactors map[string]int,
) *PerRepositoryRouter

NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration.

func (*PerRepositoryRouter) RouteRepositoryAccessor

func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)

func (*PerRepositoryRouter) RouteRepositoryCreation

func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error)

RouteRepositoryCreation picks a random healthy node to act as the primary node and selects the secondary nodes if assignments are enabled. Healthy secondaries take part in the transaction, unhealthy secondaries are set as replication targets.

func (*PerRepositoryRouter) RouteRepositoryMaintenance

func (r *PerRepositoryRouter) RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)

RouteRepositoryMaintenance will route the maintenance call to all healthy nodes in a best-effort strategy. We do not raise an error in case the primary node is unhealthy, but will in case all nodes are unhealthy.

func (*PerRepositoryRouter) RouteRepositoryMutator

func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRelativePath string) (RepositoryMutatorRoute, error)

func (*PerRepositoryRouter) RouteStorageAccessor

func (r *PerRepositoryRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error)

RouteStorageAccessor routes requests for storage-scoped accessor RPCs. The only storage scoped accessor RPC is RemoteService/FindRemoteRepository, which in turn executes a command without a repository. This can be done by any Gitaly server as it doesn't depend on the state on the server.

func (*PerRepositoryRouter) RouteStorageMutator

func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)

RouteStorageMutator is not implemented here. The only storage scoped mutator RPC is related to namespace operations. These are not relevant anymore, given hashed storage is default everywhere, and should be eventually removed.

type PrimaryGetter

type PrimaryGetter interface {
	// GetPrimary returns the primary storage for a given repository.
	GetPrimary(ctx context.Context, virtualStorage string, repositoryID int64) (string, error)
}

PrimaryGetter is an interface for getting a primary of a repository.

type Random

type Random interface {
	// Intn returns a random integer in the range [0,n).
	Intn(n int) int
	// Shuffle pseudo-randomizes the order of elements. n is the number of elements.
	// Shuffle panics if n < 0. swap swaps the elements with indexes i and j.
	Shuffle(n int, swap func(i, j int))
}

Random is the interface of the Go random number generator.

func NewLockedRandom

func NewLockedRandom(r Random) Random

NewLockedRandom wraps the passed in Random to make it safe for concurrent use.

type ReplMgr

type ReplMgr struct {
	// contains filtered or unexported fields
}

ReplMgr is a replication manager for handling replication jobs

func NewReplMgr

func NewReplMgr(log logrus.FieldLogger, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr

NewReplMgr initializes a replication manager with the provided dependencies and options

func (ReplMgr) Collect

func (r ReplMgr) Collect(ch chan<- prometheus.Metric)

func (ReplMgr) Describe

func (r ReplMgr) Describe(ch chan<- *prometheus.Desc)

func (ReplMgr) ProcessBacklog

func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory)

ProcessBacklog starts processing of queued jobs. It will be processing jobs until ctx is Done. ProcessBacklog blocks until all backlog processing goroutines have returned

func (ReplMgr) ProcessReplicationEvent

func (r ReplMgr) ProcessReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error

ProcessReplicationEvent processes a single replication event given the target client connection

func (ReplMgr) ProcessStale

func (r ReplMgr) ProcessStale(ctx context.Context, ticker helper.Ticker, staleAfter time.Duration) chan struct{}

ProcessStale starts a background process to acknowledge stale replication jobs. It will process jobs until ctx is Done.

type ReplMgrOpt

type ReplMgrOpt func(*ReplMgr)

ReplMgrOpt allows a replicator to be configured with additional options

type Replicator

type Replicator interface {
	// Replicate propagates changes from the source to the target
	Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error
	// Destroy will remove the target repo on the specified target connection
	Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
	// Rename will rename(move) the target repo on the specified target connection
	Rename(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}

Replicator performs the actual replication logic between two nodes

type RepositoryAccessorRoute

type RepositoryAccessorRoute struct {
	// ReplicaPath is the disk path where the replicas are stored.
	ReplicaPath string
	// Node contains the details of the node that should handle the request.
	Node RouterNode
}

RepositoryAccessorRoute describes how to route a repository scoped accessor call.

type RepositoryMaintenanceRoute

type RepositoryMaintenanceRoute struct {
	// RepositoryID is the repository's ID as Praefect identifies it.
	RepositoryID int64
	// ReplicaPath is the disk path where the replicas are stored.
	ReplicaPath string
	// Nodes contains all the nodes the call should be routed to.
	Nodes []RouterNode
}

RepositoryMaintenanceRoute describes how to route a repository scoped maintenance call.

type RepositoryMutatorRoute

type RepositoryMutatorRoute struct {
	// RepositoryID is the repository's ID as Praefect identifies it.
	RepositoryID int64
	// ReplicaPath is the disk path where the replicas are stored.
	ReplicaPath string
	// AdditionalReplicaPath is the disk path where the possible additional repository in the request
	// is stored. This is only used for object pools.
	AdditionalReplicaPath string
	// Primary is the primary node of the transaction.
	Primary RouterNode
	// Secondaries are the secondary participating in a transaction.
	Secondaries []RouterNode
	// ReplicationTargets are additional nodes that do not participate in a transaction
	// but need the changes replicated.
	ReplicationTargets []string
}

RepositoryMutatorRoute describes how to route a repository scoped mutator call.

type Router

type Router interface {
	// RouteStorageAccessor returns the node which should serve the storage accessor request.
	RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error)
	// RouteStorageAccessor returns the primary and secondaries that should handle the storage
	// mutator request.
	RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)
	// RouteRepositoryAccessor returns the node that should serve the repository accessor
	// request. If forcePrimary is set to `true`, it returns the primary node.
	RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
	// RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request.
	// Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator should only be used
	// with existing repositories.
	RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
	// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
	// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
	RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
	// RouteRepositoryMaintenance routes the given maintenance-style RPC to all nodes which
	// should perform maintenance. This would typically include all online nodes, regardless of
	// whether the repository hosted by them is up-to-date or not. Maintenance tasks should
	// never be replicated.
	RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)
}

Router decides which nodes to direct accessor and mutator RPCs to.

func NewNodeManagerRouter

func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router

NewNodeManagerRouter returns a router that uses the NodeManager to make routing decisions.

type RouterNode

type RouterNode struct {
	// Storage is storage of the node.
	Storage string
	// Connection is the connection to the node.
	Connection *grpc.ClientConn
}

RouterNode is a subset of a node's configuration needed to perform request routing.

type ServerFactory

type ServerFactory struct {
	// contains filtered or unexported fields
}

ServerFactory is a factory of praefect grpc servers

func NewServerFactory

func NewServerFactory(
	conf config.Config,
	logger *logrus.Entry,
	director proxy.StreamDirector,
	nodeMgr nodes.Manager,
	txMgr *transactions.Manager,
	queue datastore.ReplicationEventQueue,
	rs datastore.RepositoryStore,
	assignmentStore AssignmentStore,
	registry *protoregistry.Registry,
	conns Connections,
	primaryGetter PrimaryGetter,
	checks []service.CheckFunc,
) *ServerFactory

NewServerFactory returns factory object for initialization of praefect gRPC servers.

func (*ServerFactory) Create

func (s *ServerFactory) Create(secure bool) (*grpc.Server, error)

Create returns newly instantiated and initialized with interceptors instance of the gRPC server.

func (*ServerFactory) GracefulStop

func (s *ServerFactory) GracefulStop()

GracefulStop stops both the secure and insecure servers gracefully.

func (*ServerFactory) Serve

func (s *ServerFactory) Serve(l net.Listener, secure bool) error

Serve starts serving on the provided listener with newly created grpc.Server

func (*ServerFactory) Stop

func (s *ServerFactory) Stop()

Stop stops all servers created by the factory.

type StaticHealthChecker

type StaticHealthChecker map[string][]string

StaticHealthChecker returns the nodes as always healthy.

func (StaticHealthChecker) HealthyNodes

func (healthyNodes StaticHealthChecker) HealthyNodes() map[string][]string

type StorageMutatorRoute

type StorageMutatorRoute struct {
	// Primary is the primary node of the routing decision.
	Primary RouterNode
	// Secondaries are the secondary nodes of the routing decision.
	Secondaries []RouterNode
}

StorageMutatorRoute describes how to route a storage scoped mutator call.

Directories

Path Synopsis
Package commonerr contains common errors between different Praefect components.
Package commonerr contains common errors between different Praefect components.
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
advisorylock
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
glsql
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
grpc-proxy
proxy
Package proxy provides a reverse proxy handler for gRPC.
Package proxy provides a reverse proxy handler for gRPC.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL