Documentation
¶
Index ¶
- func TestPeering(peerName string, state pbpeering.PeeringState, meta map[string]string) *pbpeering.Peering
- func TestPeeringToken(peerID string) structs.PeeringToken
- type Apply
- type Backend
- type BidirectionalStream
- type Config
- type HandleStreamRequest
- type LeaderAddress
- type MaterializedViewStore
- type MockClient
- type MockStream
- func (s *MockStream) Context() context.Context
- func (s *MockStream) Recv() (*pbpeering.ReplicationMessage, error)
- func (s *MockStream) RecvMsg(m interface{}) error
- func (s *MockStream) Send(r *pbpeering.ReplicationMessage) error
- func (s *MockStream) SendHeader(metadata.MD) error
- func (s *MockStream) SendMsg(m interface{}) error
- func (s *MockStream) SetHeader(metadata.MD) error
- func (s *MockStream) SetTrailer(metadata.MD)
- type Service
- func (s *Service) ConnectedStreams() map[string]chan struct{}
- func (s *Service) DrainStream(req HandleStreamRequest)
- func (s *Service) Establish(ctx context.Context, req *pbpeering.EstablishRequest) (*pbpeering.EstablishResponse, error)
- func (s *Service) GenerateToken(ctx context.Context, req *pbpeering.GenerateTokenRequest) (*pbpeering.GenerateTokenResponse, error)
- func (s *Service) HandleStream(req HandleStreamRequest) error
- func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error)
- func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error)
- func (s *Service) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error)
- func (s *Service) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error)
- func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResourcesServer) error
- func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool)
- func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error)
- func (s *Service) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error)
- type Store
- type StreamStatus
- type Subscriber
- type SubscriptionBackend
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func TestPeering ¶
func TestPeering(peerName string, state pbpeering.PeeringState, meta map[string]string) *pbpeering.Peering
TestPeering is a test utility for generating a pbpeering.Peering with valid data along with the peerName, state and index.
func TestPeeringToken ¶
func TestPeeringToken(peerID string) structs.PeeringToken
TestPeeringToken is a test utility for generating a valid peering token with the given peerID for use in test cases
Types ¶
type Apply ¶
type Apply interface { PeeringWrite(req *pbpeering.PeeringWriteRequest) error PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error CatalogRegister(req *structs.RegisterRequest) error CatalogDeregister(req *structs.DeregisterRequest) error }
Apply provides a write-only interface for persisting Peering data.
type Backend ¶
type Backend interface { // Forward should forward the request to the leader when necessary. Forward(info structs.RPCInfo, f func(*grpc.ClientConn) error) (handled bool, err error) // GetAgentCACertificates returns the CA certificate to be returned in the peering token data GetAgentCACertificates() ([]string, error) // GetServerAddresses returns the addresses used for establishing a peering connection GetServerAddresses() ([]string, error) // GetServerName returns the SNI to be returned in the peering token data which // will be used by peers when establishing peering connections over TLS. GetServerName() string // EncodeToken packages a peering token into a slice of bytes. EncodeToken(tok *structs.PeeringToken) ([]byte, error) // DecodeToken unpackages a peering token from a slice of bytes. DecodeToken([]byte) (*structs.PeeringToken, error) EnterpriseCheckPartitions(partition string) error EnterpriseCheckNamespaces(namespace string) error Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) // IsLeader indicates whether the consul server is in a leader state or not. IsLeader() bool Store() Store Apply() Apply LeaderAddress() LeaderAddress }
Backend defines the core integrations the Peering endpoint depends on. A functional implementation will integrate with various subcomponents of Consul such as the State store for reading and writing data, the CA machinery for providing access to CA data and the RPC system for forwarding requests to other servers.
type BidirectionalStream ¶
type BidirectionalStream interface { Send(*pbpeering.ReplicationMessage) error Recv() (*pbpeering.ReplicationMessage, error) Context() context.Context }
type HandleStreamRequest ¶
type HandleStreamRequest struct { // LocalID is the UUID for the peering in the local Consul datacenter. LocalID string // RemoteID is the UUID for the peering from the perspective of the peer. RemoteID string // PeerName is the name of the peering. PeerName string // Partition is the local partition associated with the peer. Partition string // Stream is the open stream to the peer cluster. Stream BidirectionalStream }
type LeaderAddress ¶
type LeaderAddress interface { // Set is called on a raft.LeaderObservation in a go routine in the consul server; // see trackLeaderChanges() Set(leaderAddr string) // Get provides the best hint for the current address of the leader. // There is no guarantee that this is the actual address of the leader. Get() string }
LeaderAddress provides a way for the consul server to update the peering service about the server's leadership status. Server addresses should look like: ip:port
type MaterializedViewStore ¶
type MaterializedViewStore interface { Get(ctx context.Context, req submatview.Request) (submatview.Result, error) Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error }
type MockClient ¶
type MockClient struct { ErrCh chan error ReplicationStream *MockStream // contains filtered or unexported fields }
func NewMockClient ¶
func NewMockClient(ctx context.Context) *MockClient
func (*MockClient) Close ¶
func (c *MockClient) Close()
func (*MockClient) Recv ¶
func (c *MockClient) Recv() (*pbpeering.ReplicationMessage, error)
func (*MockClient) RecvWithTimeout ¶
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error)
func (*MockClient) Send ¶
func (c *MockClient) Send(r *pbpeering.ReplicationMessage) error
type MockStream ¶
type MockStream struct {
// contains filtered or unexported fields
}
MockStream mocks peering.PeeringService_StreamResourcesServer
func (*MockStream) Context ¶
func (s *MockStream) Context() context.Context
Context implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) Recv ¶
func (s *MockStream) Recv() (*pbpeering.ReplicationMessage, error)
Recv implements pbpeering.PeeringService_StreamResourcesServer
func (*MockStream) RecvMsg ¶
func (s *MockStream) RecvMsg(m interface{}) error
RecvMsg implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) Send ¶
func (s *MockStream) Send(r *pbpeering.ReplicationMessage) error
Send implements pbpeering.PeeringService_StreamResourcesServer
func (*MockStream) SendHeader ¶
func (s *MockStream) SendHeader(metadata.MD) error
SendHeader implements grpc.ServerStream
func (*MockStream) SendMsg ¶
func (s *MockStream) SendMsg(m interface{}) error
SendMsg implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) SetHeader ¶
func (s *MockStream) SetHeader(metadata.MD) error
SetHeader implements grpc.ServerStream
func (*MockStream) SetTrailer ¶
func (s *MockStream) SetTrailer(metadata.MD)
SetTrailer implements grpc.ServerStream
type Service ¶
type Service struct { Backend Backend // contains filtered or unexported fields }
Service implements pbpeering.PeeringService to provide RPC operations for managing peering relationships.
func NewService ¶
func (*Service) ConnectedStreams ¶
ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.
func (*Service) DrainStream ¶
func (s *Service) DrainStream(req HandleStreamRequest)
DrainStream attempts to gracefully drain the stream when the connection is going to be torn down. Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message. Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.
func (*Service) Establish ¶
func (s *Service) Establish( ctx context.Context, req *pbpeering.EstablishRequest, ) (*pbpeering.EstablishResponse, error)
Establish implements the PeeringService RPC method to finalize peering registration. Given a valid token output from a peer's GenerateToken endpoint, a peering is registered.
func (*Service) GenerateToken ¶
func (s *Service) GenerateToken( ctx context.Context, req *pbpeering.GenerateTokenRequest, ) (*pbpeering.GenerateTokenResponse, error)
GenerateToken implements the PeeringService RPC method to generate a peering token which is the initial step in establishing a peering relationship with other Consul clusters.
func (*Service) HandleStream ¶
func (s *Service) HandleStream(req HandleStreamRequest) error
The localID provided is the locally-generated identifier for the peering. The remoteID is an identifier that the remote peer recognizes for the peering.
func (*Service) PeeringDelete ¶
func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error)
func (*Service) PeeringList ¶
func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error)
func (*Service) PeeringRead ¶
func (s *Service) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error)
func (*Service) PeeringWrite ¶
func (s *Service) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error)
TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store. Consider removing if we can find another way to populate state store in peering_endpoint_test.go
func (*Service) StreamResources ¶
func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResourcesServer) error
StreamResources handles incoming streaming connections.
func (*Service) StreamStatus ¶
func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool)
func (*Service) TrustBundleListByService ¶
func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error)
func (*Service) TrustBundleRead ¶
func (s *Service) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error)
type Store ¶
type Store interface { PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error) PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) AbandonCh() <-chan struct{} }
Store provides a read-only interface for querying Peering data.
type StreamStatus ¶
type StreamStatus struct { // Connected is true when there is an open stream for the peer. Connected bool // If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero. DisconnectTime time.Time // LastAck tracks the time we received the last ACK for a resource replicated TO the peer. LastAck time.Time // LastNack tracks the time we received the last NACK for a resource replicated to the peer. LastNack time.Time // LastNackMessage tracks the reported error message associated with the last NACK from a peer. LastNackMessage string // LastSendError tracks the time of the last error sending into the stream. LastSendError time.Time // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string // LastReceiveSuccess tracks the time we last successfully stored a resource replicated FROM the peer. LastReceiveSuccess time.Time // LastReceiveError tracks either: // - The time we failed to store a resource replicated FROM the peer. // - The time of the last error when receiving from the stream. LastReceiveError time.Time // LastReceiveError tracks either: // - The error message when we failed to store a resource replicated FROM the peer. // - The last error message when receiving from the stream. LastReceiveErrorMessage string }
StreamStatus contains information about the replication stream to a peer cluster. TODO(peering): There's a lot of fields here...
type Subscriber ¶
type Subscriber interface {
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
}
type SubscriptionBackend ¶
type SubscriptionBackend interface { Subscriber Store() Store }