inventory

package
v1.2.3-fred.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const SHARDS = 128

Variables

This section is empty.

Functions

func SendHeartbeat

func SendHeartbeat(ctx context.Context, handle DownstreamHandle, hb proto.InventoryHeartbeat, retry retryutils.Retry)

Types

type Auth

type Auth interface {
	UpsertNode(context.Context, types.Server) (*types.KeepAlive, error)
	KeepAliveServer(context.Context, types.KeepAlive) error
}

Auth is an interface representing the subset of the auth API that must be made available to the controller in order for it to be able to handle control streams.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller manages the inventory control streams registered with a given auth instance. Incoming messages are processed by invoking the appropriate methods on the Auth interface.

func NewController

func NewController(auth Auth, opts ...ControllerOption) *Controller

NewController sets up a new controller instance.

func (*Controller) Close

func (c *Controller) Close() error

Close terminates all control streams registered with this controller. Control streams registered after Close() is called are closed immediately.

func (*Controller) GetControlStream

func (c *Controller) GetControlStream(serverID string) (handle UpstreamHandle, ok bool)

GetControlStream gets a control stream for the given server ID if one exists (if multiple control streams exist one is selected pseudorandomly).

func (*Controller) Iter

func (c *Controller) Iter(fn func(UpstreamHandle))

Iter iterates across all handles registered with this controller. note: if multiple handles are registered for a given server, only one handle is selected pseudorandomly to be observed.

func (*Controller) RegisterControlStream

func (c *Controller) RegisterControlStream(stream client.UpstreamInventoryControlStream, hello proto.UpstreamInventoryHello)

RegisterControlStream registers a new control stream with the controller.

type ControllerOption

type ControllerOption func(c *controllerOptions)

type DownstreamCreateFunc

type DownstreamCreateFunc func(ctx context.Context) (client.DownstreamInventoryControlStream, error)

DownstreamCreateFunc is a function that creates a downstream inventory control stream.

type DownstreamHandle

type DownstreamHandle interface {
	// Sender is used to asynchronously access a send-only reference to the current control
	// stream instance. If not currently healthy, this blocks indefinitely until a healthy control
	// stream is established.
	Sender() <-chan DownstreamSender
	// RegisterPingHandler registers a handler for downstream ping messages, returning
	// a de-registration function.
	RegisterPingHandler(DownstreamPingHandler) (unregister func())
	// CloseContext gets the close context of the downstream handle.
	CloseContext() context.Context
	// Close closes the downstream handle.
	Close() error
}

DownstreamHandle is a persistent handle used to interact with the current downstream half of the inventory control stream. This handle automatically re-creates the control stream if it fails. The latest (or next, if currently unhealthy) control stream send-half can be accessed/awaited via the Sender() channel. The intended usage pattern is that handlers for incoming messages are registered once, while components that need to send messages should re-acquire a new sender each time the old one fails. If send logic cares about auth server version, make sure to re-check the version *for each* sender, since different streams may be connected to different auth servers.

func NewDownstreamHandle

NewDownstreamHandle creates a new downstream inventory control handle which will create control streams via the supplied create func and manage hello exchange with the supplied upstream hello.

type DownstreamPingHandler

type DownstreamPingHandler func(sender DownstreamSender, msg proto.DownstreamInventoryPing)

DownstreamPingHandler is a function that handles ping messages that come down the inventory control stream.

type DownstreamSender

type DownstreamSender interface {
	// Send sends a message up the control stream.
	Send(ctx context.Context, msg proto.UpstreamInventoryMessage) error
	// Hello gets the cached downstream hello that was sent by the auth server
	// when the stream was initialized.
	Hello() proto.DownstreamInventoryHello
	// Done signals closure of the underlying stream.
	Done() <-chan struct{}
}

DownstreamSender is a send-only reference to the downstream half of an inventory control stream. Components that require use of the inventory control stream should accept a DownstreamHandle instead, and take a reference to the sender via the Sender() method.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a sharded key-value store that manages inventory control handles.

note: the sharding here may not really be necessary. sharding does improve perf under high combined read/write load, but perf isn't terrible without sharding (~2.5s vs ~0.5s in the basic benchmark). we've previously seen outages due to contention on a similar structure in the event fanout system, and I opted to shard here as well since the expected load on startup is similar to that system (though the fanout system performs more memory allocation under lock, which I suspect is why it has worse single-lock perf despite being otherwise quite similar).

func NewStore

func NewStore() *Store

NewStore creates a new inventory control handle store.

func (*Store) Get

func (s *Store) Get(serverID string) (handle UpstreamHandle, ok bool)

Get attempts to load a handle for the given server ID. note: if multiple handles exist for a given server, the returned handle is selected pseudorandomly from the available set.

func (*Store) Insert

func (s *Store) Insert(handle UpstreamHandle)

Insert adds a new handle to the store.

func (*Store) Iter

func (s *Store) Iter(fn func(UpstreamHandle))

Iter iterates across all handles registered with this store. note: if multiple handles are registered for a given server, only one handle is selected pseudorandomly to be observed.

func (*Store) Len

func (s *Store) Len() int

Len returns the count of currently registered servers (servers with multiple handles registered still only count as one).

func (*Store) Remove

func (s *Store) Remove(handle UpstreamHandle)

Remove removes the handle from the store.

type UpstreamHandle

type UpstreamHandle interface {
	client.UpstreamInventoryControlStream
	// Hello gets the cached upstream hello that was used to initialize the stream.
	Hello() proto.UpstreamInventoryHello

	Ping(ctx context.Context) (d time.Duration, err error)
	// HasService is a helper for checking if a given service is associated with this
	// stream.
	HasService(types.SystemRole) bool
}

UpstreamHandle is the primary mechanism for interacting with a fully initialized upstream control stream. The hello message cached in this handle has already passed through the auth layer, meaning that it represents the verified identity and capabilities of the remote entity.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL