autoconfig

package
v8.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package autoconfig contains a client for the auto-configuration streaming service.

This abstracts away the protocol details so that the Relay implementation only needs to know what environments it should add, update, or remove.

Index

Constants

View Source
const (
	ActionInsert = Action("insert")
	ActionUpdate = Action("updated")
	ActionDelete = Action("delete")
	ActionNoop   = Action("noop")
)
View Source
const (
	// PutEvent is the SSE event name corresponding to PutMessageData.
	PutEvent = "put"

	// PatchEvent is the SSE event name corresponding to PatchMessageData.
	PatchEvent = "patch"

	// DeleteEvent is the SSE event name corresponding to DeleteMessageData.
	DeleteEvent = "delete"

	// ReconnectEvent is the SSE event name for a message that forces a stream reconnect.
	ReconnectEvent = "reconnect"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action string

An Action represents actions generated in response to invoking MessageReceiver's methods, which a caller should handle. Returned Actions will obey the following order constraints:

  1. An item will never be Updated or Deleted without first being Inserted.
  2. An item may be Updated 0 or more times.
  3. An item will be Deleted exactly once.

At any point, a Noop may be emitted, in which case the caller may take no action.

type DeleteMessageData

type DeleteMessageData struct {
	// Path is currently always "environments/$ENVID".
	Path string `json:"path"`

	// Version is a number that must be greater than the last known version of the environment.
	Version int `json:"version"`
}

DeleteMessageData is the JSON data for an SSE message that removes an environment.

type Item

type Item interface {
	// Describe is the human-readable identifier for the item, used in log messages.
	Describe() string
}

An Item is anything that can report its own ID and a human-readable description of itself. Its allows the MessageReceiver to maintain a list of "seen" items.

type MessageHandler

type MessageHandler interface {
	// AddEnvironment is called when the stream has provided a configuration for an environment
	// that StreamManager has not seen before. This can happen due to either a "put" or a "patch".
	AddEnvironment(params envfactory.EnvironmentParams)

	// UpdateEnvironment is called when the stream has provided a new configuration for an
	// existing environment. This can happen due to either a "put" or a "patch".
	UpdateEnvironment(params envfactory.EnvironmentParams)

	// ReceivedAllEnvironments is called when StreamManager has received a "put" event and has
	// finished calling AddEnvironment or UpdateEnvironment for every environment in the list (and
	// DeleteEnvironment for any previously existing environments that are no longer in the list).
	// We use this at startup time to determine when Relay has acquired a complete configuration.
	ReceivedAllEnvironments()

	// DeleteEnvironment is called when an environment should be removed, due to either a "delete"
	// message, or a "put" that no longer contains that environment.
	DeleteEnvironment(id config.EnvironmentID)

	// KeyExpired is called when a key that was previously provided in EnvironmentParams.ExpiringSDKKey
	// has now expired. Relay should disconnect any clients currently using that key and reject any
	// future requests that use it.
	KeyExpired(id config.EnvironmentID, projKey string, oldKey config.SDKKey)

	// AddFilter is called whenever a new filter should be added, either in a "put" or "patch" message.
	AddFilter(params envfactory.FilterParams)

	// DeleteFilter is called whenever a filter should be deleted, via a "delete" message.
	DeleteFilter(id config.FilterID)
}

MessageHandler defines the methods that StreamManager will call when it receives messages from the auto-configuration stream.

type MessageReceiver

type MessageReceiver[T Item] struct {
	// contains filtered or unexported fields
}

MessageReceiver is responsible for transforming a potentially unreliable stream of SSE message from LaunchDarkly into a reliable sequence of commands for other components. It serves to isolate the state needed for those operations into a single place, so that other parts of the code can simply process commands in the order they arrive.

As a motivating example, the following messages are received in order:

Message 1: {delete XYZ, version 2}
Message 2: {upsert XYZ, version 1}

This component must discard Message 2, since its version number is <= than a previous message. The final result is that XYZ should not exist in the system.

func NewMessageReceiver

func NewMessageReceiver[T Item](loggers ldlog.Loggers) *MessageReceiver[T]

func (*MessageReceiver[T]) Delete

func (v *MessageReceiver[T]) Delete(id string, version int) Action

Delete receives an item ID and version, conditionally forwarding the request to the underlying ItemReceiver.

func (*MessageReceiver[T]) Forget

func (v *MessageReceiver[T]) Forget(id string) Action

Forget causes MessageReceiver to behave as if the ID was never seen before. It may invoke the ItemReceiver's delete command if the item exists.

func (*MessageReceiver[T]) Purge

func (v *MessageReceiver[T]) Purge(predicate func(id string) bool) []string

Purge calls Forget on all ids for which the predicate returns true. Returns the IDs of all items that should be deleted.

func (*MessageReceiver[T]) Retain

func (v *MessageReceiver[T]) Retain(predicate func(id string) bool) []string

Retain keeps all ids for which the predicate returns true, and calls Forget on the rest. Returns the IDs of all items that should be deleted.

func (*MessageReceiver[T]) Upsert

func (v *MessageReceiver[T]) Upsert(id string, item T, version int) Action

Upsert receives an item and version, conditionally forwarding it to the underlying ItemReceiver.

type PatchMessageData

type PatchMessageData struct {
	Path string          `json:"path"`
	Data json.RawMessage `json:"data"`
}

PatchMessageData is the JSON data for an SSE message that adds or updates a single environment.

type PutContent

type PutContent struct {
	// Environments is a map of environment representations.
	Environments map[config.EnvironmentID]envfactory.EnvironmentRep `json:"environments"`
	// Filters is a map of filter representations.
	Filters map[config.FilterID]envfactory.FilterRep `json:"filters"`
}

PutContent is the environent map within PutMessageData.

type PutMessageData

type PutMessageData struct {
	// Path is currently always "/" for this message type.
	Path string `json:"path"`

	// Data contains the environment map.
	Data PutContent `json:"data"`
}

PutMessageData is the JSON data for an SSE message that provides a full set of environments.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages the auto-configuration SSE stream.

That includes managing the stream connection itself (reconnecting as needed, the same as the SDK streams), and also maintaining the last known state of information received from the stream so that it can determine whether an update is really an update (that is, checking version numbers and diffing the contents of a "put" event against the previous state).

Relay provides an implementation of the MessageHandler interface which will be called for all changes that it needs to know about.

func NewStreamManager

func NewStreamManager(
	key config.AutoConfigKey,
	streamURI *url.URL,
	handler MessageHandler,
	httpConfig httpconfig.HTTPConfig,
	initialRetryDelay time.Duration,
	protocolVersion int,
	loggers ldlog.Loggers,
) *StreamManager

NewStreamManager creates a StreamManager, but does not start the connection.

func (*StreamManager) Close

func (s *StreamManager) Close()

Close permanently shuts down the stream.

func (*StreamManager) IgnoreExpiringSDKKey

func (s *StreamManager) IgnoreExpiringSDKKey(env envfactory.EnvironmentRep) bool

IgnoreExpiringSDKKey implements the EnvironmentMsgAdapter's KeyChecker interface. Its main purpose is to create a goroutine that triggers SDK key expiration, if the EnvironmentRep specifies that. Additionally, it returns true if an ExpiringSDKKey should be ignored (since the expiry is stale).

func (*StreamManager) Start

func (s *StreamManager) Start() <-chan error

Start causes the StreamManager to start trying to connect to the auto-config stream. The returned channel receives nil for a successful connection, or an error if it has permanently failed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL