Documentation ¶
Index ¶
Constants ¶
const SHARDS = 128
Variables ¶
This section is empty.
Functions ¶
func SendHeartbeat ¶
func SendHeartbeat(ctx context.Context, handle DownstreamHandle, hb proto.InventoryHeartbeat, retry retryutils.Retry)
Types ¶
type Auth ¶
type Auth interface { UpsertNode(context.Context, types.Server) (*types.KeepAlive, error) KeepAliveServer(context.Context, types.KeepAlive) error }
Auth is an interface representing the subset of the auth API that must be made available to the controller in order for it to be able to handle control streams.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller manages the inventory control streams registered with a given auth instance. Incoming messages are processed by invoking the appropriate methods on the Auth interface.
func NewController ¶
func NewController(auth Auth, opts ...ControllerOption) *Controller
NewController sets up a new controller instance.
func (*Controller) Close ¶
func (c *Controller) Close() error
Close terminates all control streams registered with this controller. Control streams registered after Close() is called are closed immediately.
func (*Controller) GetControlStream ¶
func (c *Controller) GetControlStream(serverID string) (handle UpstreamHandle, ok bool)
GetControlStream gets a control stream for the given server ID if one exists (if multiple control streams exist one is selected pseudorandomly).
func (*Controller) Iter ¶
func (c *Controller) Iter(fn func(UpstreamHandle))
Iter iterates across all handles registered with this controller. note: if multiple handles are registered for a given server, only one handle is selected pseudorandomly to be observed.
func (*Controller) RegisterControlStream ¶
func (c *Controller) RegisterControlStream(stream client.UpstreamInventoryControlStream, hello proto.UpstreamInventoryHello)
RegisterControlStream registers a new control stream with the controller.
type ControllerOption ¶
type ControllerOption func(c *controllerOptions)
type DownstreamCreateFunc ¶
type DownstreamCreateFunc func(ctx context.Context) (client.DownstreamInventoryControlStream, error)
DownstreamCreateFunc is a function that creates a downstream inventory control stream.
type DownstreamHandle ¶
type DownstreamHandle interface { // Sender is used to asynchronously access a send-only reference to the current control // stream instance. If not currently healthy, this blocks indefinitely until a healthy control // stream is established. Sender() <-chan DownstreamSender // RegisterPingHandler registers a handler for downstream ping messages, returning // a de-registration function. RegisterPingHandler(DownstreamPingHandler) (unregister func()) // CloseContext gets the close context of the downstream handle. CloseContext() context.Context // Close closes the downstream handle. Close() error }
DownstreamHandle is a persistent handle used to interact with the current downstream half of the inventory control stream. This handle automatically re-creates the control stream if it fails. The latest (or next, if currently unhealthy) control stream send-half can be accessed/awaited via the Sender() channel. The intended usage pattern is that handlers for incoming messages are registered once, while components that need to send messages should re-acquire a new sender each time the old one fails. If send logic cares about auth server version, make sure to re-check the version *for each* sender, since different streams may be connected to different auth servers.
func NewDownstreamHandle ¶
func NewDownstreamHandle(fn DownstreamCreateFunc, hello proto.UpstreamInventoryHello) DownstreamHandle
NewDownstreamHandle creates a new downstream inventory control handle which will create control streams via the supplied create func and manage hello exchange with the supplied upstream hello.
type DownstreamPingHandler ¶
type DownstreamPingHandler func(sender DownstreamSender, msg proto.DownstreamInventoryPing)
DownstreamPingHandler is a function that handles ping messages that come down the inventory control stream.
type DownstreamSender ¶
type DownstreamSender interface { // Send sends a message up the control stream. Send(ctx context.Context, msg proto.UpstreamInventoryMessage) error // Hello gets the cached downstream hello that was sent by the auth server // when the stream was initialized. Hello() proto.DownstreamInventoryHello // Done signals closure of the underlying stream. Done() <-chan struct{} }
DownstreamSender is a send-only reference to the downstream half of an inventory control stream. Components that require use of the inventory control stream should accept a DownstreamHandle instead, and take a reference to the sender via the Sender() method.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is a sharded key-value store that manages inventory control handles.
note: the sharding here may not really be necessary. sharding does improve perf under high combined read/write load, but perf isn't terrible without sharding (~2.5s vs ~0.5s in the basic benchmark). we've previously seen outages due to contention on a similar structure in the event fanout system, and I opted to shard here as well since the expected load on startup is similar to that system (though the fanout system performs more memory allocation under lock, which I suspect is why it has worse single-lock perf despite being otherwise quite similar).
func (*Store) Get ¶
func (s *Store) Get(serverID string) (handle UpstreamHandle, ok bool)
Get attempts to load a handle for the given server ID. note: if multiple handles exist for a given server, the returned handle is selected pseudorandomly from the available set.
func (*Store) Insert ¶
func (s *Store) Insert(handle UpstreamHandle)
Insert adds a new handle to the store.
func (*Store) Iter ¶
func (s *Store) Iter(fn func(UpstreamHandle))
Iter iterates across all handles registered with this store. note: if multiple handles are registered for a given server, only one handle is selected pseudorandomly to be observed.
func (*Store) Len ¶
Len returns the count of currently registered servers (servers with multiple handles registered still only count as one).
func (*Store) Remove ¶
func (s *Store) Remove(handle UpstreamHandle)
Remove removes the handle from the store.
type UpstreamHandle ¶
type UpstreamHandle interface { client.UpstreamInventoryControlStream // Hello gets the cached upstream hello that was used to initialize the stream. Hello() proto.UpstreamInventoryHello Ping(ctx context.Context) (d time.Duration, err error) // HasService is a helper for checking if a given service is associated with this // stream. HasService(types.SystemRole) bool }
UpstreamHandle is the primary mechanism for interacting with a fully initialized upstream control stream. The hello message cached in this handle has already passed through the auth layer, meaning that it represents the verified identity and capabilities of the remote entity.