praefect

package
v14.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2021 License: MIT Imports: 59 Imported by: 0

Documentation

Overview

Package praefect is a Gitaly reverse proxy for transparently routing gRPC calls to a set of Gitaly services.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoHealthyNodes = errors.New("no healthy nodes")

ErrNoHealthyNodes is returned when there are no healthy nodes to serve a request.

View Source
var ErrNoSuitableNode = errors.New("no suitable node to serve the request")

ErrNoSuitableNode is returned when there is not suitable node to serve a request.

View Source
var ErrRepositoryReadOnly = helper.ErrPreconditionFailedf("repository is in read-only mode")

ErrRepositoryReadOnly is returned when the repository is in read-only mode. This happens if the primary does not have the latest changes.

Functions

func GetBuildTime

func GetBuildTime() string

GetBuildTime returns the time at which the build took place

func GetVersion

func GetVersion() string

GetVersion returns the semver compatible version number

func GetVersionString

func GetVersionString() string

GetVersionString returns a standard version header

func NewBackchannelServerFactory

func NewBackchannelServerFactory(logger *logrus.Entry, svc gitalypb.RefTransactionServer) backchannel.ServerFactory

NewBackchannelServerFactory returns a ServerFactory that serves the RefTransactionServer on the backchannel connection.

func NewGRPCServer

func NewGRPCServer(
	conf config.Config,
	logger *logrus.Entry,
	registry *protoregistry.Registry,
	director proxy.StreamDirector,
	nodeMgr nodes.Manager,
	txMgr *transactions.Manager,
	queue datastore.ReplicationEventQueue,
	rs datastore.RepositoryStore,
	assignmentStore AssignmentStore,
	conns Connections,
	primaryGetter PrimaryGetter,
	grpcOpts ...grpc.ServerOption,
) *grpc.Server

NewGRPCServer returns gRPC server with registered proxy-handler and actual services praefect serves on its own. It includes a set of unary and stream interceptors required to add logging, authentication, etc.

func RepositoryExistsStreamInterceptor

func RepositoryExistsStreamInterceptor(rs datastore.RepositoryStore) grpc.StreamServerInterceptor

RepositoryExistsStreamInterceptor returns a stream interceptor that handles /gitaly.RepositoryService/RepositoryExists calls by checking whether there is a record of the repository in the database.

func WithDelayMetric

func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr)

WithDelayMetric is an option to set the delay prometheus metric

func WithDequeueBatchSize

func WithDequeueBatchSize(size uint) func(*ReplMgr)

WithDequeueBatchSize configures the number of events to dequeue in a single batch.

func WithLatencyMetric

func WithLatencyMetric(h prommetrics.HistogramVec) func(*ReplMgr)

WithLatencyMetric is an option to set the latency prometheus metric

Types

type AssignmentGetter

type AssignmentGetter interface {
	// GetHostAssignments returns the names of the storages assigned to host the repository.
	// The primary node must always be assigned.
	GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error)
}

AssignmentGetter is an interface for getting repository host node assignments.

type AssignmentStore

type AssignmentStore interface {
	AssignmentGetter
	// SetReplicationFactor sets a repository's replication factor to the desired value and returns the
	// resulting assignments.
	SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error)
}

AssignmentStore is the interface which Praefect uses to operate on repository assignments.

func NewDisabledAssignmentStore

func NewDisabledAssignmentStore(storages map[string][]string) AssignmentStore

NewDisabledAssignmentStore returns an assignments store that can be used if no database is configured. It returns every configured storage as assigned and errors when trying to set assignments.

type BackoffFunc

type BackoffFunc func() (backoff, backoffReset)

BackoffFunc is a function that n turn provides a pair of functions backoff and backoffReset

func ExpBackoffFunc

func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc

ExpBackoffFunc generates a backoffFunc based off of start and max time durations

type Connections

type Connections map[string]map[string]*grpc.ClientConn

Connections is a set of connections to configured storage nodes by their virtual storages.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator takes care of directing client requests to the appropriate downstream server. The coordinator is thread safe; concurrent calls to register nodes are safe.

func NewCoordinator

NewCoordinator returns a new Coordinator that utilizes the provided logger

func (*Coordinator) Collect

func (c *Coordinator) Collect(metrics chan<- prometheus.Metric)

func (*Coordinator) Describe

func (c *Coordinator) Describe(descs chan<- *prometheus.Desc)

func (*Coordinator) StreamDirector

func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error)

StreamDirector determines which downstream servers receive requests

type HealthChecker

type HealthChecker interface {
	// HealthyNodes gets a list of healthy storages by their virtual storage.
	HealthyNodes() map[string][]string
}

HealthChecker manages information of locally healthy nodes.

type Node

type Node struct {
	// Storage is the name of the storage node.
	Storage string
	// Address is the address of the node.
	Address string
	// Token is the authentication token of the node.
	Token string
	// Connection is a gRPC connection to the storage node.
	Connection *grpc.ClientConn
}

Node is a storage node in a virtual storage.

type NodeSet

type NodeSet map[string]map[string]Node

NodeSet contains nodes by their virtual storage and storage names.

func DialNodes

func DialNodes(
	ctx context.Context,
	virtualStorages []*config.VirtualStorage,
	registry *protoregistry.Registry,
	errorTracker tracker.ErrorTracker,
	handshaker client.Handshaker,
) (NodeSet, error)

DialNodes dials the configured storage nodes.

func NodeSetFromNodeManager

func NodeSetFromNodeManager(mgr nodes.Manager) NodeSet

NodeSetFromNodeManager converts connections set up by the node manager in to a NodeSet. This is a temporary adapter required due to cyclic imports between the praefect and nodes packages.

func (NodeSet) Close

func (set NodeSet) Close()

Close closes the connections in the NodeSet. Errors on closing are ignored.

func (NodeSet) Connections

func (set NodeSet) Connections() Connections

Connections is a convenience method to return the connections from the NodeSet.

func (NodeSet) HealthClients

func (set NodeSet) HealthClients() nodes.HealthClients

HealthClients is a convenience method to return the HealthClients from the NodeSet.

type PerRepositoryRouter

type PerRepositoryRouter struct {
	// contains filtered or unexported fields
}

PerRepositoryRouter implements a router that routes requests respecting per repository primary nodes.

func NewPerRepositoryRouter

func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecker, rand Random, csg datastore.ConsistentStoragesGetter, ag AssignmentGetter, defaultReplicationFactors map[string]int) *PerRepositoryRouter

NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration.

func (*PerRepositoryRouter) RouteRepositoryAccessor

func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error)

func (*PerRepositoryRouter) RouteRepositoryCreation

func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error)

RouteRepositoryCreation picks a random healthy node to act as the primary node and selects the secondary nodes if assignments are enabled. Healthy secondaries take part in the transaction, unhealthy secondaries are set as replication targets.

func (*PerRepositoryRouter) RouteRepositoryMutator

func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)

func (*PerRepositoryRouter) RouteStorageAccessor

func (r *PerRepositoryRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error)

RouteStorageAccessor routes requests for storage-scoped accessor RPCs. The only storage scoped accessor RPC is RemoteService/FindRemoteRepository, which in turn executes a command without a repository. This can be done by any Gitaly server as it doesn't depend on the state on the server.

func (*PerRepositoryRouter) RouteStorageMutator

func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)

RouteStorageMutator is not implemented here. The only storage scoped mutator RPC is related to namespace operations. These are not relevant anymore, given hashed storage is default everywhere, and should be eventually removed.

type PrimaryGetter

type PrimaryGetter interface {
	// GetPrimary returns the primary storage for a given repository.
	GetPrimary(ctx context.Context, virtualStorage string, relativePath string) (string, error)
}

PrimaryGetter is an interface for getting a primary of a repository.

type Random

type Random interface {
	// Intn returns a random integer in the range [0,n).
	Intn(n int) int
	// Shuffle pseudo-randomizes the order of elements. n is the number of elements.
	// Shuffle panics if n < 0. swap swaps the elements with indexes i and j.
	Shuffle(n int, swap func(i, j int))
}

Random is the interface of the Go random number generator.

func NewLockedRandom

func NewLockedRandom(r Random) Random

NewLockedRandom wraps the passed in Random to make it safe for concurrent use.

type ReplMgr

type ReplMgr struct {
	// contains filtered or unexported fields
}

ReplMgr is a replication manager for handling replication jobs

func NewReplMgr

func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr

NewReplMgr initializes a replication manager with the provided dependencies and options

func (ReplMgr) Collect

func (r ReplMgr) Collect(ch chan<- prometheus.Metric)

func (ReplMgr) Describe

func (r ReplMgr) Describe(ch chan<- *prometheus.Desc)

func (ReplMgr) ProcessBacklog

func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc)

ProcessBacklog starts processing of queued jobs. It will be processing jobs until ctx is Done. ProcessBacklog blocks until all backlog processing goroutines have returned

func (ReplMgr) ProcessStale

func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time.Duration) chan struct{}

ProcessStale starts a background process to acknowledge stale replication jobs. It will process jobs until ctx is Done.

type ReplMgrOpt

type ReplMgrOpt func(*ReplMgr)

ReplMgrOpt allows a replicator to be configured with additional options

func WithAllowlist

func WithAllowlist(allowlistedRepos []string) ReplMgrOpt

WithAllowlist will configure a allowlist for repos to allow replication

func WithReplicator

func WithReplicator(r Replicator) ReplMgrOpt

WithReplicator overrides the default replicator

type Replicator

type Replicator interface {
	// Replicate propagates changes from the source to the target
	Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error
	// Destroy will remove the target repo on the specified target connection
	Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
	// Rename will rename(move) the target repo on the specified target connection
	Rename(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
	// GarbageCollect will run gc on the target repository
	GarbageCollect(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
	// RepackFull will do a full repack on the target repository
	RepackFull(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
	// RepackIncremental will do an incremental repack on the target repository
	RepackIncremental(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
	// Cleanup will do a cleanup on the target repository
	Cleanup(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
	// PackRefs will optimize references on the target repository
	PackRefs(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}

Replicator performs the actual replication logic between two nodes

type RepositoryMutatorRoute

type RepositoryMutatorRoute struct {
	// Primary is the primary node of the transaction.
	Primary RouterNode
	// Secondaries are the secondary participating in a transaction.
	Secondaries []RouterNode
	// ReplicationTargets are additional nodes that do not participate in a transaction
	// but need the changes replicated.
	ReplicationTargets []string
}

RepositoryMutatorRoute describes how to route a repository scoped mutator call.

type Router

type Router interface {
	// RouteStorageAccessor returns the node which should serve the storage accessor request.
	RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error)
	// RouteStorageAccessor returns the primary and secondaries that should handle the storage
	// mutator request.
	RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)
	// RouteRepositoryAccessor returns the node that should serve the repository accessor
	// request. If forcePrimary is set to `true`, it returns the primary node.
	RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error)
	// RouteRepositoryMutatorTransaction returns the primary and secondaries that should handle the repository mutator request.
	// Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator should only be used
	// with existing repositories.
	RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error)
	// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
	// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
	RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error)
}

Router decides which nodes to direct accessor and mutator RPCs to.

func NewNodeManagerRouter

func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router

NewNodeManagerRouter returns a router that uses the NodeManager to make routing decisions.

type RouterNode

type RouterNode struct {
	// Storage is storage of the node.
	Storage string
	// Connection is the connection to the node.
	Connection *grpc.ClientConn
}

RouterNode is a subset of a node's configuration needed to perform request routing.

type ServerFactory

type ServerFactory struct {
	// contains filtered or unexported fields
}

ServerFactory is a factory of praefect grpc servers

func NewServerFactory

func NewServerFactory(
	conf config.Config,
	logger *logrus.Entry,
	director proxy.StreamDirector,
	nodeMgr nodes.Manager,
	txMgr *transactions.Manager,
	queue datastore.ReplicationEventQueue,
	rs datastore.RepositoryStore,
	assignmentStore AssignmentStore,
	registry *protoregistry.Registry,
	conns Connections,
	primaryGetter PrimaryGetter,
) *ServerFactory

NewServerFactory returns factory object for initialization of praefect gRPC servers.

func (*ServerFactory) Create

func (s *ServerFactory) Create(secure bool) (*grpc.Server, error)

Create returns newly instantiated and initialized with interceptors instance of the gRPC server.

func (*ServerFactory) GracefulStop

func (s *ServerFactory) GracefulStop()

GracefulStop stops both the secure and insecure servers gracefully.

func (*ServerFactory) Serve

func (s *ServerFactory) Serve(l net.Listener, secure bool) error

Serve starts serving on the provided listener with newly created grpc.Server

func (*ServerFactory) Stop

func (s *ServerFactory) Stop()

Stop stops all servers created by the factory.

type StaticHealthChecker

type StaticHealthChecker map[string][]string

StaticHealthChecker returns the nodes as always healthy.

func (StaticHealthChecker) HealthyNodes

func (healthyNodes StaticHealthChecker) HealthyNodes() map[string][]string

type StaticStorageAssignments

type StaticStorageAssignments map[string][]string

StaticStorageAssignments is a static assignment of the same storages in a virtual storage for every repository.

func (StaticStorageAssignments) GetHostAssignments

func (st StaticStorageAssignments) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error)

type StorageMutatorRoute

type StorageMutatorRoute struct {
	// Primary is the primary node of the routing decision.
	Primary RouterNode
	// Secondaries are the secondary nodes of the routing decision.
	Secondaries []RouterNode
}

StorageMutatorRoute describes how to route a storage scoped mutator call.

Directories

Path Synopsis
Package commonerr contains common errors between different Praefect components.
Package commonerr contains common errors between different Praefect components.
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
Package datastore provides data models and datastore persistence abstractions for tracking the state of repository replicas.
advisorylock
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
Package advisorylock contains the lock IDs of all advisory locks used in Praefect.
glsql
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
Package glsql (Gitaly SQL) is a helper package to work with plain SQL queries.
grpc-proxy
proxy
Package proxy provides a reverse proxy handler for gRPC.
Package proxy provides a reverse proxy handler for gRPC.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL