streams

package
v8.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package streams defines the Relay core abstractions for implementing streaming endpoints, and provides some standard implementations for those endpoints.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MakePingEvent

func MakePingEvent() eventsource.Event

MakePingEvent creates a "ping" event for client-side SDKs.

func MakeServerSideDeleteEvent

func MakeServerSideDeleteEvent(kind ldstoretypes.DataKind, key string, version int) eventsource.Event

MakeServerSideDeleteEvent creates a "delete" event for server-side SDKs.

func MakeServerSideFlagsOnlyDeleteEvent

func MakeServerSideFlagsOnlyDeleteEvent(key string, version int) eventsource.Event

MakeServerSideFlagsOnlyDeleteEvent creates a "delete" event for old server-side SDKs that use the flags-only stream.

func MakeServerSideFlagsOnlyPatchEvent

func MakeServerSideFlagsOnlyPatchEvent(key string, item ldstoretypes.ItemDescriptor) eventsource.Event

MakeServerSideFlagsOnlyPatchEvent creates a "patch" event for old server-side SDKs that use the flags-only stream.

func MakeServerSideFlagsOnlyPutEvent

func MakeServerSideFlagsOnlyPutEvent(allData []ldstoretypes.Collection) eventsource.Event

MakeServerSideFlagsOnlyPutEvent creates a "put" event for old server-side SDKs that use the flags-only stream.

func MakeServerSidePatchEvent

func MakeServerSidePatchEvent(
	kind ldstoretypes.DataKind,
	key string,
	item ldstoretypes.ItemDescriptor,
) eventsource.Event

MakeServerSidePatchEvent creates a "patch" event for server-side SDKs.

func MakeServerSidePutEvent

func MakeServerSidePutEvent(allData []ldstoretypes.Collection) eventsource.Event

MakeServerSidePutEvent creates a "put" event for server-side SDKs.

Types

type EnvStoreQueries

type EnvStoreQueries interface {
	IsInitialized() bool
	GetAll(ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error)
}

EnvStoreQueries is a subset of DataStore methods that are used by EnvStreams to query existing data from the store, for generating "put" events.

type EnvStreamProvider

type EnvStreamProvider interface {
	EnvStreamUpdates // SendAllDataUpdate, SendSingleItemUpdate

	// SendHeartbeat sends keep-alive data on the stream.
	SendHeartbeat()

	// Close releases all resources for this EnvStreamProvider and closes all connections to it.
	Close()
}

EnvStreamProvider is an abstraction of publishing events to a stream for a specific environment. Implementations of this interface are created by StreamProvider.Register().

type EnvStreamUpdates

type EnvStreamUpdates interface {
	// SendAllDataUpdate signals an update to the entire SDK data set. The specified data items will be
	// broadcast to all connected server-side SDKs in a "put" event, and all connected client-side SDKs
	// will receive a "ping" event to refresh their state.
	SendAllDataUpdate(allData []ldstoretypes.Collection)

	// SendSingleItemUpdate signals an update to an individual SDK data item (flag or segment). The
	// specified data item will be broadcast to all connected server-side SDKs in a "patch" event, and
	// all connected client-side SDKs will receive a "ping" event to refresh their state.
	SendSingleItemUpdate(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor)

	// InvalidateClientSideState signals an update that could affect client-side evaluations, but does
	// not change any server-side data item. Updating big segment data is such an event, since the
	// segment in the regular server-side SDK data does not change but evaluating the segment could
	// now produce a different result. Nothing is broadcast to the server-side SDKs (since they have
	// their own mechanisms for detecting big segment updates), but all connected client-side SDKs will
	// receive a "ping" event to refresh their state.
	InvalidateClientSideState()
}

EnvStreamUpdates is an interface representing the kinds of updates we can publish to streams. Other components that publish updates to EnvStreams should use this interface rather than the implementation type, both to clarify that they don't need other EnvStreams functionality and to simplify testing.

type EnvStreams

type EnvStreams struct {
	// contains filtered or unexported fields
}

EnvStreams encapsulates streaming behavior for a specific environment.

EnvStreams itself does not know anything about what kind of streams are available; those are determined by the StreamProvider instances that are passed in the constructor, and the credentials that are registered with AddCredential. For each combination of a credential and a StreamProvider that can handle that credential, a stream is available, and data updates that are sent with the EnvStreamUpdates methods will be rebroadcast to all of those streams, in a format that is determined by each StreamProvider.

func NewEnvStreams

func NewEnvStreams(
	streamProviders []StreamProvider,
	storeQueries EnvStoreQueries,
	heartbeatInterval time.Duration,
	filterKey config.FilterKey,
	loggers ldlog.Loggers,
) *EnvStreams

NewEnvStreams creates an instance of EnvStreams.

func (*EnvStreams) AddCredential

func (es *EnvStreams) AddCredential(credential credential.SDKCredential)

AddCredential adds an environment keyed off the combination of credential and payload filter, and creates a corresponding EnvStreamProvider.

func (*EnvStreams) Close

func (es *EnvStreams) Close() error

Close shuts down all currently active streams for this environment and releases its resources.

func (*EnvStreams) InvalidateClientSideState

func (es *EnvStreams) InvalidateClientSideState()

InvalidateClientSideState sends all appropriate stream updates for when client-side state should be refreshed.

func (*EnvStreams) RemoveCredential

func (es *EnvStreams) RemoveCredential(credential credential.SDKCredential)

RemoveCredential shuts down the EnvStreamProvider, if any, specified by a combination of credential and payload filter key.

func (*EnvStreams) SendAllDataUpdate

func (es *EnvStreams) SendAllDataUpdate(
	allData []ldstoretypes.Collection,
)

SendAllDataUpdate sends all appropriate stream updates for when the full data set has been refreshed.

func (*EnvStreams) SendSingleItemUpdate

func (es *EnvStreams) SendSingleItemUpdate(
	kind ldstoretypes.DataKind,
	key string,
	item ldstoretypes.ItemDescriptor,
)

SendSingleItemUpdate sends all appropriate stream updates for when an individual item has been updated.

type StreamProvider

type StreamProvider interface {
	// Handler returns an HTTP request handler for the given scoped SDK credential.
	// It can return nil if it does not support this type of credential.
	Handler(credential sdkauth.ScopedCredential) http.HandlerFunc

	// Register tells the StreamProvider about an environment that it should support, and returns an
	// implementation of EnvStreamsUpdates for pushing updates related to that environment. It can
	// return nil if it does not support this type of credential.
	Register(credential sdkauth.ScopedCredential, store EnvStoreQueries, loggers ldlog.Loggers) EnvStreamProvider

	// Close tells the StreamProvider to release all of its resources and close all connections.
	Close()
}

StreamProvider is an abstraction of a specific kind of SSE event stream, such as the server-side SDK "/all" stream. The streams package provides default implementations of this interface for the streams that are supported by the standard Relay Proxy.

Each StreamProvider expects a specific kind of SDKCredential, e.g. config.SDKKey for the server-side streams. If the wrong kind of credential is passed, it should behave as it would for an unrecognized key. It is important that there can be more than one StreamProvider for a given credential.

func NewStreamProvider

func NewStreamProvider(kind basictypes.StreamKind, maxConnTime time.Duration) StreamProvider

NewStreamProvider creates a StreamProvider implementation for the specified kind of stream endpoint.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL