nodes

package
v14.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2021 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoPrimary = errors.New("no primary")

ErrNoPrimary is returned if the repository does not have a primary.

View Source
var ErrPrimaryNotHealthy = errors.New("primary gitaly is not healthy")

ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence should not be used for a new request

View Source
var ErrVirtualStorageNotExist = errors.New("virtual storage does not exist")

ErrVirtualStorageNotExist indicates the node manager is not aware of the virtual storage for which a shard is being requested

Functions

func Dial

func Dial(ctx context.Context, node *config.Node, registry *protoregistry.Registry, errorTracker tracker.ErrorTracker, handshaker client.Handshaker) (*grpc.ClientConn, error)

Dial dials a node with the necessary interceptors configured.

func GeneratePraefectName

func GeneratePraefectName(c config.Config, log logrus.FieldLogger) string

GeneratePraefectName generates a name so that each Praefect process can report node statuses independently. This will enable us to do a SQL election to determine which nodes are active. Ideally this name doesn't change across restarts since that may temporarily make it look like there are more Praefect processes active for determining a quorum.

Types

type HealthClients

type HealthClients map[string]map[string]grpc_health_v1.HealthClient

HealthClients contains HealthClients for every physical storage by virtual storage.

type HealthManager

type HealthManager struct {
	// contains filtered or unexported fields
}

HealthManager monitors the health status of the storage cluster. The monitoring frequency is controlled by the Ticker passed in to Run method. On each tick, the HealthManager:

  1. Runs health checks on configured physical storages by performing a gRPC call to the health checking endpoint. If an error tracker is configured, it also considers its view of the node's health.
  2. Stores its health check results in the `node_status` table.
  3. Checks if the clusters consensus of healthy nodes has changed by querying the `node_status` table for results of the other Praefect instances. If so, it sends to the Updated channel to signal a change in the cluster status.

To determine the participants for the quorum, we use a lightweight service discovery protocol. A Praefect instance is deemed to be voting member if it has a recent health check in the `node_status` table. Each Praefect node is identified by their host name and the provided stable ID. The stable ID should uniquely identify a Praefect instance on the host.

func NewHealthManager

func NewHealthManager(
	log logrus.FieldLogger,
	db glsql.Querier,
	praefectName string,
	clients HealthClients,
) *HealthManager

NewHealthManager returns a new health manager that monitors which nodes in the cluster are healthy.

func (*HealthManager) HealthConsensus

func (hm *HealthManager) HealthConsensus() map[string][]string

HealthConsensus returns a map of healthy nodes in each virtual storage as determined by the consensus of Praefect nodes. The returned set might include nodes which are not present in the local configuration if the cluster's consensus has deemed them healthy.

func (*HealthManager) HealthyNodes

func (hm *HealthManager) HealthyNodes() map[string][]string

HealthyNodes returns a map of healthy nodes in each virtual storage as seen by the latest local health check.

func (*HealthManager) Run

func (hm *HealthManager) Run(ctx context.Context, ticker helper.Ticker) error

Run runs the health check on every tick by the Ticker until the context is canceled. Returns the error from the context.

func (*HealthManager) Updated

func (hm *HealthManager) Updated() <-chan struct{}

Updated returns a channel that is sent to when the set of healthy nodes is updated. Update is also sent to on the first check even if no nodes are healthy. The channel is buffered to allow HealthManager to proceed with cluster health monitoring when the channel consumer is slow.

type Manager

type Manager interface {
	GetShard(ctx context.Context, virtualStorageName string) (Shard, error)
	// GetSyncedNode returns a random storage node based on the state of the replication.
	// It returns primary in case there are no up to date secondaries or error occurs.
	GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error)
	// HealthyNodes returns healthy storages by virtual storage.
	HealthyNodes() map[string][]string
	// Nodes returns nodes by their virtual storages.
	Nodes() map[string][]Node
}

Manager is responsible for returning shards for virtual storages

type Mgr

type Mgr struct {
	// contains filtered or unexported fields
}

Mgr is a concrete type that adheres to the Manager interface

func NewManager

func NewManager(
	log *logrus.Entry,
	c config.Config,
	db *sql.DB,
	csg datastore.ConsistentStoragesGetter,
	latencyHistogram prommetrics.HistogramVec,
	registry *protoregistry.Registry,
	errorTracker tracker.ErrorTracker,
	handshaker client.Handshaker,
) (*Mgr, error)

NewManager creates a new NodeMgr based on virtual storage configs

func (*Mgr) GetPrimary

func (n *Mgr) GetPrimary(ctx context.Context, virtualStorage, _ string) (string, error)

GetPrimary returns the current primary of a repository. This is an adapter so NodeManager can be used as a praefect.PrimaryGetter in newer code which written to support repository specific primaries.

func (*Mgr) GetShard

func (n *Mgr) GetShard(ctx context.Context, virtualStorageName string) (Shard, error)

GetShard retrieves a shard for a virtual storage name

func (*Mgr) GetSyncedNode

func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error)

func (*Mgr) HealthyNodes

func (n *Mgr) HealthyNodes() map[string][]string

func (*Mgr) Nodes

func (n *Mgr) Nodes() map[string][]Node

func (*Mgr) Start

func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration)

Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off the monitoring process. Start must be called before NodeMgr can be used.

type MockManager

type MockManager struct {
	Manager
	GetShardFunc func(string) (Shard, error)
	Storage      string
}

MockManager is a helper for tests that implements Manager and allows for parametrizing behavior.

func (*MockManager) GetShard

func (m *MockManager) GetShard(_ context.Context, storage string) (Shard, error)

func (*MockManager) HealthyNodes

func (m *MockManager) HealthyNodes() map[string][]string

HealthyNodes returns healthy nodes. This is implemented similar to Nodes() and thus also requires setup of the MockManager's Storage field.

func (*MockManager) Nodes

func (m *MockManager) Nodes() map[string][]Node

Nodes returns nodes contained by the GetShardFunc. Note that this mocking only works in case the MockManager was set up with a Storage as the GetShardFunc will be called with that storage as parameter.

type MockNode

type MockNode struct {
	Node
	GetStorageMethod func() string
	Conn             *grpc.ClientConn
	Healthy          bool
}

MockNode is a helper for tests that implements Node and allows for parametrizing behavior.

func (*MockNode) GetAddress

func (m *MockNode) GetAddress() string

func (*MockNode) GetConnection

func (m *MockNode) GetConnection() *grpc.ClientConn

func (*MockNode) GetStorage

func (m *MockNode) GetStorage() string

func (*MockNode) GetToken

func (m *MockNode) GetToken() string

func (*MockNode) IsHealthy

func (m *MockNode) IsHealthy() bool

type Node

type Node interface {
	GetStorage() string
	GetAddress() string
	GetToken() string
	GetConnection() *grpc.ClientConn
	// IsHealthy reports if node is healthy and can handle requests.
	// Node considered healthy if last 'healthcheckThreshold' checks were positive.
	IsHealthy() bool
	// CheckHealth executes health check for the node and tracks last 'healthcheckThreshold' checks for it.
	CheckHealth(context.Context) (bool, error)
}

Node represents some metadata of a node as well as a connection

type PerRepositoryElector

type PerRepositoryElector struct {
	// contains filtered or unexported fields
}

PerRepositoryElector implements an elector that selects a primary for each repository. It elects a healthy node with most recent generation as the primary. If all nodes are on the same generation, it picks one randomly to balance repositories in simple fashion.

func NewPerRepositoryElector

func NewPerRepositoryElector(log logrus.FieldLogger, db glsql.Querier) *PerRepositoryElector

NewPerRepositoryElector returns a new per repository primary elector.

func (*PerRepositoryElector) GetPrimary

func (pr *PerRepositoryElector) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error)

GetPrimary returns the primary storage of a repository.

func (*PerRepositoryElector) Run

func (pr *PerRepositoryElector) Run(ctx context.Context, trigger <-chan struct{}) error

Run listens on the trigger channel for updates. On each update, it tries to elect new primaries for repositories which have an unhealthy primary. Blocks until the context is canceled or the trigger channel is closed. Returns the error from the context.

type Shard

type Shard struct {
	Primary     Node
	Secondaries []Node
}

Shard is a primary with a set of secondaries

func (Shard) GetHealthySecondaries

func (s Shard) GetHealthySecondaries() []Node

GetHealthySecondaries returns all secondaries of the shard whose which are currently known to be healthy.

func (Shard) GetNode

func (s Shard) GetNode(storage string) (Node, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL