Documentation ¶
Overview ¶
Package autoconfig contains a client for the auto-configuration streaming service.
This abstracts away the protocol details so that the Relay implementation only needs to know what environments it should add, update, or remove.
Index ¶
- Constants
- type Action
- type DeleteMessageData
- type Item
- type MessageHandler
- type MessageReceiver
- func (v *MessageReceiver[T]) Delete(id string, version int) Action
- func (v *MessageReceiver[T]) Forget(id string) Action
- func (v *MessageReceiver[T]) Purge(predicate func(id string) bool) []string
- func (v *MessageReceiver[T]) Retain(predicate func(id string) bool) []string
- func (v *MessageReceiver[T]) Upsert(id string, item T, version int) Action
- type PatchMessageData
- type PutContent
- type PutMessageData
- type StreamManager
Constants ¶
const ( ActionInsert = Action("insert") ActionUpdate = Action("updated") ActionDelete = Action("delete") ActionNoop = Action("noop") )
const ( // PutEvent is the SSE event name corresponding to PutMessageData. PutEvent = "put" // PatchEvent is the SSE event name corresponding to PatchMessageData. PatchEvent = "patch" // DeleteEvent is the SSE event name corresponding to DeleteMessageData. DeleteEvent = "delete" // ReconnectEvent is the SSE event name for a message that forces a stream reconnect. ReconnectEvent = "reconnect" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action string
An Action represents actions generated in response to invoking MessageReceiver's methods, which a caller should handle. Returned Actions will obey the following order constraints:
- An item will never be Updated or Deleted without first being Inserted.
- An item may be Updated 0 or more times.
- An item will be Deleted exactly once.
At any point, a Noop may be emitted, in which case the caller may take no action.
type DeleteMessageData ¶
type DeleteMessageData struct { // Path is currently always "environments/$ENVID". Path string `json:"path"` // Version is a number that must be greater than the last known version of the environment. Version int `json:"version"` }
DeleteMessageData is the JSON data for an SSE message that removes an environment.
type Item ¶
type Item interface { // Describe is the human-readable identifier for the item, used in log messages. Describe() string }
An Item is anything that can report its own ID and a human-readable description of itself. Its allows the MessageReceiver to maintain a list of "seen" items.
type MessageHandler ¶
type MessageHandler interface { // AddEnvironment is called when the stream has provided a configuration for an environment // that StreamManager has not seen before. This can happen due to either a "put" or a "patch". AddEnvironment(params envfactory.EnvironmentParams) // UpdateEnvironment is called when the stream has provided a new configuration for an // existing environment. This can happen due to either a "put" or a "patch". UpdateEnvironment(params envfactory.EnvironmentParams) // ReceivedAllEnvironments is called when StreamManager has received a "put" event and has // finished calling AddEnvironment or UpdateEnvironment for every environment in the list (and // DeleteEnvironment for any previously existing environments that are no longer in the list). // We use this at startup time to determine when Relay has acquired a complete configuration. ReceivedAllEnvironments() // DeleteEnvironment is called when an environment should be removed, due to either a "delete" // message, or a "put" that no longer contains that environment. DeleteEnvironment(id config.EnvironmentID) // KeyExpired is called when a key that was previously provided in EnvironmentParams.ExpiringSDKKey // has now expired. Relay should disconnect any clients currently using that key and reject any // future requests that use it. KeyExpired(id config.EnvironmentID, projKey string, oldKey config.SDKKey) // AddFilter is called whenever a new filter should be added, either in a "put" or "patch" message. AddFilter(params envfactory.FilterParams) // DeleteFilter is called whenever a filter should be deleted, via a "delete" message. DeleteFilter(id config.FilterID) }
MessageHandler defines the methods that StreamManager will call when it receives messages from the auto-configuration stream.
type MessageReceiver ¶
type MessageReceiver[T Item] struct { // contains filtered or unexported fields }
MessageReceiver is responsible for transforming a potentially unreliable stream of SSE message from LaunchDarkly into a reliable sequence of commands for other components. It serves to isolate the state needed for those operations into a single place, so that other parts of the code can simply process commands in the order they arrive.
As a motivating example, the following messages are received in order:
Message 1: {delete XYZ, version 2} Message 2: {upsert XYZ, version 1}
This component must discard Message 2, since its version number is <= than a previous message. The final result is that XYZ should not exist in the system.
func NewMessageReceiver ¶
func NewMessageReceiver[T Item](loggers ldlog.Loggers) *MessageReceiver[T]
func (*MessageReceiver[T]) Delete ¶
func (v *MessageReceiver[T]) Delete(id string, version int) Action
Delete receives an item ID and version, conditionally forwarding the request to the underlying ItemReceiver.
func (*MessageReceiver[T]) Forget ¶
func (v *MessageReceiver[T]) Forget(id string) Action
Forget causes MessageReceiver to behave as if the ID was never seen before. It may invoke the ItemReceiver's delete command if the item exists.
func (*MessageReceiver[T]) Purge ¶
func (v *MessageReceiver[T]) Purge(predicate func(id string) bool) []string
Purge calls Forget on all ids for which the predicate returns true. Returns the IDs of all items that should be deleted.
type PatchMessageData ¶
type PatchMessageData struct { Path string `json:"path"` Data json.RawMessage `json:"data"` }
PatchMessageData is the JSON data for an SSE message that adds or updates a single environment.
type PutContent ¶
type PutContent struct { // Environments is a map of environment representations. Environments map[config.EnvironmentID]envfactory.EnvironmentRep `json:"environments"` // Filters is a map of filter representations. Filters map[config.FilterID]envfactory.FilterRep `json:"filters"` }
PutContent is the environent map within PutMessageData.
type PutMessageData ¶
type PutMessageData struct { // Path is currently always "/" for this message type. Path string `json:"path"` // Data contains the environment map. Data PutContent `json:"data"` }
PutMessageData is the JSON data for an SSE message that provides a full set of environments.
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages the auto-configuration SSE stream.
That includes managing the stream connection itself (reconnecting as needed, the same as the SDK streams), and also maintaining the last known state of information received from the stream so that it can determine whether an update is really an update (that is, checking version numbers and diffing the contents of a "put" event against the previous state).
Relay provides an implementation of the MessageHandler interface which will be called for all changes that it needs to know about.
func NewStreamManager ¶
func NewStreamManager( key config.AutoConfigKey, streamURI *url.URL, handler MessageHandler, httpConfig httpconfig.HTTPConfig, initialRetryDelay time.Duration, protocolVersion int, loggers ldlog.Loggers, ) *StreamManager
NewStreamManager creates a StreamManager, but does not start the connection.
func (*StreamManager) Close ¶
func (s *StreamManager) Close()
Close permanently shuts down the stream.
func (*StreamManager) IgnoreExpiringSDKKey ¶
func (s *StreamManager) IgnoreExpiringSDKKey(env envfactory.EnvironmentRep) bool
IgnoreExpiringSDKKey implements the EnvironmentMsgAdapter's KeyChecker interface. Its main purpose is to create a goroutine that triggers SDK key expiration, if the EnvironmentRep specifies that. Additionally, it returns true if an ExpiringSDKKey should be ignored (since the expiry is stale).
func (*StreamManager) Start ¶
func (s *StreamManager) Start() <-chan error
Start causes the StreamManager to start trying to connect to the auto-config stream. The returned channel receives nil for a successful connection, or an error if it has permanently failed.