peering

package
v1.13.0-alpha2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2022 License: MPL-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func TestPeering

func TestPeering(peerName string, state pbpeering.PeeringState, meta map[string]string) *pbpeering.Peering

TestPeering is a test utility for generating a pbpeering.Peering with valid data along with the peerName, state and index.

func TestPeeringToken

func TestPeeringToken(peerID string) structs.PeeringToken

TestPeeringToken is a test utility for generating a valid peering token with the given peerID for use in test cases

Types

type Apply

type Apply interface {
	PeeringWrite(req *pbpeering.PeeringWriteRequest) error
	PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
	PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
	CatalogRegister(req *structs.RegisterRequest) error
	CatalogDeregister(req *structs.DeregisterRequest) error
}

Apply provides a write-only interface for persisting Peering data.

type Backend

type Backend interface {
	// Forward should forward the request to the leader when necessary.
	Forward(info structs.RPCInfo, f func(*grpc.ClientConn) error) (handled bool, err error)

	// GetAgentCACertificates returns the CA certificate to be returned in the peering token data
	GetAgentCACertificates() ([]string, error)

	// GetServerAddresses returns the addresses used for establishing a peering connection
	GetServerAddresses() ([]string, error)

	// GetServerName returns the SNI to be returned in the peering token data which
	// will be used by peers when establishing peering connections over TLS.
	GetServerName() string

	// EncodeToken packages a peering token into a slice of bytes.
	EncodeToken(tok *structs.PeeringToken) ([]byte, error)

	// DecodeToken unpackages a peering token from a slice of bytes.
	DecodeToken([]byte) (*structs.PeeringToken, error)

	EnterpriseCheckPartitions(partition string) error

	EnterpriseCheckNamespaces(namespace string) error

	Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)

	// IsLeader indicates whether the consul server is in a leader state or not.
	IsLeader() bool

	Store() Store
	Apply() Apply
	LeaderAddress() LeaderAddress
}

Backend defines the core integrations the Peering endpoint depends on. A functional implementation will integrate with various subcomponents of Consul such as the State store for reading and writing data, the CA machinery for providing access to CA data and the RPC system for forwarding requests to other servers.

type BidirectionalStream

type BidirectionalStream interface {
	Send(*pbpeering.ReplicationMessage) error
	Recv() (*pbpeering.ReplicationMessage, error)
	Context() context.Context
}

type Config

type Config struct {
	Datacenter     string
	ConnectEnabled bool
}

type HandleStreamRequest

type HandleStreamRequest struct {
	// LocalID is the UUID for the peering in the local Consul datacenter.
	LocalID string

	// RemoteID is the UUID for the peering from the perspective of the peer.
	RemoteID string

	// PeerName is the name of the peering.
	PeerName string

	// Partition is the local partition associated with the peer.
	Partition string

	// Stream is the open stream to the peer cluster.
	Stream BidirectionalStream
}

type LeaderAddress

type LeaderAddress interface {
	// Set is called on a raft.LeaderObservation in a go routine in the consul server;
	// see trackLeaderChanges()
	Set(leaderAddr string)

	// Get provides the best hint for the current address of the leader.
	// There is no guarantee that this is the actual address of the leader.
	Get() string
}

LeaderAddress provides a way for the consul server to update the peering service about the server's leadership status. Server addresses should look like: ip:port

type MaterializedViewStore

type MaterializedViewStore interface {
	Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
	Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}

type MockClient

type MockClient struct {
	ErrCh             chan error
	ReplicationStream *MockStream
	// contains filtered or unexported fields
}

func NewMockClient

func NewMockClient(ctx context.Context) *MockClient

func (*MockClient) Close

func (c *MockClient) Close()

func (*MockClient) Recv

func (*MockClient) RecvWithTimeout

func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error)

func (*MockClient) Send

type MockStream

type MockStream struct {
	// contains filtered or unexported fields
}

MockStream mocks peering.PeeringService_StreamResourcesServer

func (*MockStream) Context

func (s *MockStream) Context() context.Context

Context implements grpc.ServerStream and grpc.ClientStream

func (*MockStream) Recv

Recv implements pbpeering.PeeringService_StreamResourcesServer

func (*MockStream) RecvMsg

func (s *MockStream) RecvMsg(m interface{}) error

RecvMsg implements grpc.ServerStream and grpc.ClientStream

func (*MockStream) Send

Send implements pbpeering.PeeringService_StreamResourcesServer

func (*MockStream) SendHeader

func (s *MockStream) SendHeader(metadata.MD) error

SendHeader implements grpc.ServerStream

func (*MockStream) SendMsg

func (s *MockStream) SendMsg(m interface{}) error

SendMsg implements grpc.ServerStream and grpc.ClientStream

func (*MockStream) SetHeader

func (s *MockStream) SetHeader(metadata.MD) error

SetHeader implements grpc.ServerStream

func (*MockStream) SetTrailer

func (s *MockStream) SetTrailer(metadata.MD)

SetTrailer implements grpc.ServerStream

type Service

type Service struct {
	Backend Backend
	// contains filtered or unexported fields
}

Service implements pbpeering.PeeringService to provide RPC operations for managing peering relationships.

func NewService

func NewService(logger hclog.Logger, cfg Config, backend Backend) *Service

func (*Service) ConnectedStreams

func (s *Service) ConnectedStreams() map[string]chan struct{}

ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.

func (*Service) DrainStream

func (s *Service) DrainStream(req HandleStreamRequest)

DrainStream attempts to gracefully drain the stream when the connection is going to be torn down. Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message. Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.

func (*Service) Establish

Establish implements the PeeringService RPC method to finalize peering registration. Given a valid token output from a peer's GenerateToken endpoint, a peering is registered.

func (*Service) GenerateToken

GenerateToken implements the PeeringService RPC method to generate a peering token which is the initial step in establishing a peering relationship with other Consul clusters.

func (*Service) HandleStream

func (s *Service) HandleStream(req HandleStreamRequest) error

The localID provided is the locally-generated identifier for the peering. The remoteID is an identifier that the remote peer recognizes for the peering.

func (*Service) PeeringList

func (*Service) PeeringRead

func (*Service) PeeringWrite

TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store. Consider removing if we can find another way to populate state store in peering_endpoint_test.go

func (*Service) StreamResources

func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResourcesServer) error

StreamResources handles incoming streaming connections.

func (*Service) StreamStatus

func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool)

func (*Service) TrustBundleListByService

TODO(peering): rename rpc & request/response to drop the "service" part

type Store

type Store interface {
	PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
	PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
	PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
	PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
	PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
	ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
	ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
	CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
	NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error)
	CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
	TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
	AbandonCh() <-chan struct{}
}

Store provides a read-only interface for querying Peering data.

type StreamStatus

type StreamStatus struct {
	// Connected is true when there is an open stream for the peer.
	Connected bool

	// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
	DisconnectTime time.Time

	// LastAck tracks the time we received the last ACK for a resource replicated TO the peer.
	LastAck time.Time

	// LastNack tracks the time we received the last NACK for a resource replicated to the peer.
	LastNack time.Time

	// LastNackMessage tracks the reported error message associated with the last NACK from a peer.
	LastNackMessage string

	// LastSendError tracks the time of the last error sending into the stream.
	LastSendError time.Time

	// LastSendErrorMessage tracks the last error message when sending into the stream.
	LastSendErrorMessage string

	// LastReceiveSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
	LastReceiveSuccess time.Time

	// LastReceiveError tracks either:
	// - The time we failed to store a resource replicated FROM the peer.
	// - The time of the last error when receiving from the stream.
	LastReceiveError time.Time

	// LastReceiveError tracks either:
	// - The error message when we failed to store a resource replicated FROM the peer.
	// - The last error message when receiving from the stream.
	LastReceiveErrorMessage string
}

StreamStatus contains information about the replication stream to a peer cluster. TODO(peering): There's a lot of fields here...

type Subscriber

type Subscriber interface {
	Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
}

type SubscriptionBackend

type SubscriptionBackend interface {
	Subscriber
	Store() Store
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL