Documentation ¶
Index ¶
- type ACLResolver
- type Backend
- type BidirectionalStream
- type Config
- type HandleStreamRequest
- type MaterializedViewStore
- type MockACLResolver
- type MockClient
- func (c *MockClient) Close()
- func (c *MockClient) DrainStream(t *testing.T)
- func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error)
- func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error)
- func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error
- type MockStream
- func (s *MockStream) Context() context.Context
- func (s *MockStream) Recv() (*pbpeerstream.ReplicationMessage, error)
- func (s *MockStream) RecvMsg(m interface{}) error
- func (s *MockStream) Send(r *pbpeerstream.ReplicationMessage) error
- func (s *MockStream) SendHeader(metadata.MD) error
- func (s *MockStream) SendMsg(m interface{}) error
- func (s *MockStream) SetHeader(metadata.MD) error
- func (s *MockStream) SetTrailer(metadata.MD)
- type MutableStatus
- func (s *MutableStatus) Done() <-chan struct{}
- func (s *MutableStatus) GetExportedServicesCount() int
- func (s *MutableStatus) GetImportedServicesCount() int
- func (s *MutableStatus) GetStatus() Status
- func (s *MutableStatus) IsConnected() bool
- func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName)
- func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName)
- func (s *MutableStatus) TrackAck()
- func (s *MutableStatus) TrackConnected()
- func (s *MutableStatus) TrackDisconnectedDueToError(error string)
- func (s *MutableStatus) TrackDisconnectedGracefully()
- func (s *MutableStatus) TrackNack(msg string)
- func (s *MutableStatus) TrackRecvError(error string)
- func (s *MutableStatus) TrackRecvHeartbeat()
- func (s *MutableStatus) TrackRecvResourceSuccess()
- func (s *MutableStatus) TrackSendError(error string)
- func (s *MutableStatus) TrackSendSuccess()
- type Server
- func (s *Server) ConnectedStreams() map[string]chan struct{}
- func (s *Server) DrainStream(req HandleStreamRequest)
- func (s *Server) ExchangeSecret(ctx context.Context, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error)
- func (s *Server) HandleStream(streamReq HandleStreamRequest) error
- func (s *Server) Register(grpcServer *grpc.Server)
- func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamResourcesServer) error
- func (s *Server) StreamStatus(peerID string) (resp Status, found bool)
- type StateStore
- type Status
- type Subscriber
- type SubscriptionBackend
- type Tracker
- func (t *Tracker) Connected(id string) (*MutableStatus, error)
- func (t *Tracker) ConnectedStreams() map[string]chan struct{}
- func (t *Tracker) DeleteStatus(id string)
- func (t *Tracker) DisconnectedDueToError(id string, error string)
- func (t *Tracker) DisconnectedGracefully(id string)
- func (t *Tracker) IsHealthy(s Status) bool
- func (t *Tracker) Register(id string) (*MutableStatus, error)
- func (t *Tracker) StreamStatus(id string) (resp Status, found bool)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ACLResolver ¶
type ACLResolver interface {
ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (resolver.Result, error)
}
type Backend ¶
type Backend interface { Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) // IsLeader indicates whether the consul server is in a leader state or not. IsLeader() bool // SetLeaderAddress is called on a raft.LeaderObservation in a go routine // in the consul server; see trackLeaderChanges() SetLeaderAddress(string) // GetLeaderAddress provides the best hint for the current address of the // leader. There is no guarantee that this is the actual address of the // leader. GetLeaderAddress() string ValidateProposedPeeringSecret(id string) (bool, error) PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error CatalogRegister(req *structs.RegisterRequest) error CatalogDeregister(req *structs.DeregisterRequest) error PeeringWrite(req *pbpeering.PeeringWriteRequest) error }
type BidirectionalStream ¶
type BidirectionalStream interface { Send(*pbpeerstream.ReplicationMessage) error Recv() (*pbpeerstream.ReplicationMessage, error) Context() context.Context }
type Config ¶
type Config struct { Backend Backend GetStore func() StateStore Logger hclog.Logger ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) ACLResolver ACLResolver // Datacenter of the Consul server this gRPC server is hosted on Datacenter string ConnectEnabled bool // contains filtered or unexported fields }
type HandleStreamRequest ¶
type HandleStreamRequest struct { // LocalID is the UUID for the peering in the local Consul datacenter. LocalID string // RemoteID is the UUID for the peering from the perspective of the peer. RemoteID string // PeerName is the name of the peering. PeerName string // Partition is the local partition associated with the peer. Partition string // Stream is the open stream to the peer cluster. Stream BidirectionalStream }
func (HandleStreamRequest) IsAcceptor ¶
func (r HandleStreamRequest) IsAcceptor() bool
type MaterializedViewStore ¶
type MaterializedViewStore interface { Get(ctx context.Context, req submatview.Request) (submatview.Result, error) Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error }
type MockACLResolver ¶
MockACLResolver is an autogenerated mock type for the ACLResolver type
func NewMockACLResolver ¶
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver
NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockACLResolver) ResolveTokenAndDefaultMeta ¶
func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.EnterpriseMeta, _a2 *acl.AuthorizerContext) (resolver.Result, error)
ResolveTokenAndDefaultMeta provides a mock function with given fields: _a0, _a1, _a2
type MockClient ¶
type MockClient struct { ErrCh chan error ReplicationStream *MockStream // contains filtered or unexported fields }
func NewMockClient ¶
func NewMockClient(ctx context.Context) *MockClient
func (*MockClient) Close ¶
func (c *MockClient) Close()
func (*MockClient) DrainStream ¶
func (c *MockClient) DrainStream(t *testing.T)
DrainStream reads messages from the stream until both the exported service list and trust bundle messages have been read. We do this because their ording is indeterministic.
func (*MockClient) Recv ¶
func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error)
func (*MockClient) RecvWithTimeout ¶
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error)
func (*MockClient) Send ¶
func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error
type MockStream ¶
type MockStream struct {
// contains filtered or unexported fields
}
MockStream mocks peering.PeeringService_StreamResourcesServer
func (*MockStream) Context ¶
func (s *MockStream) Context() context.Context
Context implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) Recv ¶
func (s *MockStream) Recv() (*pbpeerstream.ReplicationMessage, error)
Recv implements pbpeerstream.PeeringService_StreamResourcesServer
func (*MockStream) RecvMsg ¶
func (s *MockStream) RecvMsg(m interface{}) error
RecvMsg implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) Send ¶
func (s *MockStream) Send(r *pbpeerstream.ReplicationMessage) error
Send implements pbpeerstream.PeeringService_StreamResourcesServer
func (*MockStream) SendHeader ¶
func (s *MockStream) SendHeader(metadata.MD) error
SendHeader implements grpc.ServerStream
func (*MockStream) SendMsg ¶
func (s *MockStream) SendMsg(m interface{}) error
SendMsg implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) SetHeader ¶
func (s *MockStream) SetHeader(metadata.MD) error
SetHeader implements grpc.ServerStream
func (*MockStream) SetTrailer ¶
func (s *MockStream) SetTrailer(metadata.MD)
SetTrailer implements grpc.ServerStream
type MutableStatus ¶
type MutableStatus struct { Status // contains filtered or unexported fields }
func (*MutableStatus) Done ¶
func (s *MutableStatus) Done() <-chan struct{}
func (*MutableStatus) GetExportedServicesCount ¶
func (s *MutableStatus) GetExportedServicesCount() int
func (*MutableStatus) GetImportedServicesCount ¶
func (s *MutableStatus) GetImportedServicesCount() int
func (*MutableStatus) GetStatus ¶
func (s *MutableStatus) GetStatus() Status
func (*MutableStatus) IsConnected ¶
func (s *MutableStatus) IsConnected() bool
func (*MutableStatus) SetExportedServices ¶
func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName)
func (*MutableStatus) SetImportedServices ¶
func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName)
func (*MutableStatus) TrackAck ¶
func (s *MutableStatus) TrackAck()
func (*MutableStatus) TrackConnected ¶
func (s *MutableStatus) TrackConnected()
func (*MutableStatus) TrackDisconnectedDueToError ¶
func (s *MutableStatus) TrackDisconnectedDueToError(error string)
TrackDisconnectedDueToError tracks when the stream was disconnected due to an error. For example the heartbeat timed out, or we couldn't send into the stream.
func (*MutableStatus) TrackDisconnectedGracefully ¶
func (s *MutableStatus) TrackDisconnectedGracefully()
TrackDisconnectedGracefully tracks when the stream was disconnected in a way we expected. For example, we got a terminated message, or we terminated the stream ourselves.
func (*MutableStatus) TrackNack ¶
func (s *MutableStatus) TrackNack(msg string)
func (*MutableStatus) TrackRecvError ¶
func (s *MutableStatus) TrackRecvError(error string)
func (*MutableStatus) TrackRecvHeartbeat ¶
func (s *MutableStatus) TrackRecvHeartbeat()
TrackRecvHeartbeat tracks receiving a heartbeat from our peer.
func (*MutableStatus) TrackRecvResourceSuccess ¶
func (s *MutableStatus) TrackRecvResourceSuccess()
TrackRecvResourceSuccess tracks receiving a replicated resource.
func (*MutableStatus) TrackSendError ¶
func (s *MutableStatus) TrackSendError(error string)
func (*MutableStatus) TrackSendSuccess ¶
func (s *MutableStatus) TrackSendSuccess()
type Server ¶
func (*Server) ConnectedStreams ¶
ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.
func (*Server) DrainStream ¶
func (s *Server) DrainStream(req HandleStreamRequest)
DrainStream attempts to gracefully drain the stream when the connection is going to be torn down. Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message. Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.
func (*Server) ExchangeSecret ¶
func (s *Server) ExchangeSecret(ctx context.Context, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error)
ExchangeSecret exchanges the one-time secret embedded in a peering token for a long-lived secret for use with the peering stream handler. This secret exchange prevents peering tokens from being reused.
Note that if the peering secret exchange fails, a peering token may need to be re-generated, since the one-time initiation secret may have been invalidated.
func (*Server) HandleStream ¶
func (s *Server) HandleStream(streamReq HandleStreamRequest) error
func (*Server) StreamResources ¶
func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamResourcesServer) error
StreamResources handles incoming streaming connections.
type StateStore ¶
type StateStore interface { PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error) PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) PeeringSecretsRead(ws memdb.WatchSet, peerID string) (*pbpeering.PeeringSecrets, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *acl.EnterpriseMeta) (uint64, structs.ConfigEntry, error) AbandonCh() <-chan struct{} }
StateStore provides a read-only interface for querying Peering data.
type Status ¶
type Status struct { // Connected is true when there is an open stream for the peer. Connected bool // NeverConnected is true for peerings that have never connected, false otherwise. NeverConnected bool // DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully. // If the stream is connected or it disconnected gracefully it will be empty. DisconnectErrorMessage string // If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero. DisconnectTime *time.Time // LastAck tracks the time we received the last ACK for a resource replicated TO the peer. LastAck *time.Time // LastNack tracks the time we received the last NACK for a resource replicated to the peer. LastNack *time.Time // LastNackMessage tracks the reported error message associated with the last NACK from a peer. LastNackMessage string // LastSendError tracks the time of the last error sending into the stream. LastSendError *time.Time // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string // LastSendSuccess tracks the time we last successfully sent a resource TO the peer. LastSendSuccess *time.Time // LastRecvHeartbeat tracks when we last received a heartbeat from our peer. LastRecvHeartbeat *time.Time // LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer. LastRecvResourceSuccess *time.Time // LastRecvError tracks either: // - The time we failed to store a resource replicated FROM the peer. // - The time of the last error when receiving from the stream. LastRecvError *time.Time // LastRecvErrorMessage tracks the last error message when receiving from the stream. LastRecvErrorMessage string // TODO(peering): consider keeping track of imported and exported services thru raft // ImportedServices keeps track of which service names are imported for the peer ImportedServices []string // ExportedServices keeps track of which service names a peer asks to export ExportedServices []string }
Status contains information about the replication stream to a peer cluster. TODO(peering): There's a lot of fields here...
func (*Status) GetExportedServicesCount ¶
func (*Status) GetImportedServicesCount ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
}
type SubscriptionBackend ¶
type SubscriptionBackend interface { Subscriber }
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker contains a map of (PeerID -> MutableStatus). As streams are opened and closed we track details about their status.
func NewTracker ¶
func (*Tracker) Connected ¶
func (t *Tracker) Connected(id string) (*MutableStatus, error)
Connected registers a stream for a given peer, and marks it as connected. It also enforces that there is only one active stream for a peer.
func (*Tracker) ConnectedStreams ¶
func (*Tracker) DeleteStatus ¶
func (*Tracker) DisconnectedDueToError ¶
DisconnectedDueToError marks the peer id's stream status as disconnected due to an error.
func (*Tracker) DisconnectedGracefully ¶
DisconnectedGracefully marks the peer id's stream status as disconnected gracefully.
func (*Tracker) IsHealthy ¶
IsHealthy is a calculates the health of a peering status. We define a peering as unhealthy if its status has been in the following states for longer than the configured incomingHeartbeatTimeout.
- If it is disconnected
- If the last received Nack is newer than last received Ack
- If the last received error is newer than last received success
If none of these conditions apply, we call the peering healthy.