Documentation
¶
Overview ¶
Package streams defines the Relay core abstractions for implementing streaming endpoints, and provides some standard implementations for those endpoints.
This package is in core rather than internal because Relay distributions that define any new streaming behaviors will need to use this package.
Index ¶
- func MakePingEvent() eventsource.Event
- func MakeServerSideDeleteEvent(kind ldstoretypes.DataKind, key string, version int) eventsource.Event
- func MakeServerSideFlagsOnlyDeleteEvent(key string, version int) eventsource.Event
- func MakeServerSideFlagsOnlyPatchEvent(key string, item ldstoretypes.ItemDescriptor) eventsource.Event
- func MakeServerSideFlagsOnlyPutEvent(allData []ldstoretypes.Collection) eventsource.Event
- func MakeServerSidePatchEvent(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor) eventsource.Event
- func MakeServerSidePutEvent(allData []ldstoretypes.Collection) eventsource.Event
- type EnvStoreQueries
- type EnvStreamProvider
- type EnvStreamUpdates
- type EnvStreams
- func (es *EnvStreams) AddCredential(credential config.SDKCredential)
- func (es *EnvStreams) Close() error
- func (es *EnvStreams) RemoveCredential(credential config.SDKCredential)
- func (es *EnvStreams) SendAllDataUpdate(allData []ldstoretypes.Collection)
- func (es *EnvStreams) SendSingleItemUpdate(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor)
- type StreamProvider
- func NewJSClientPingStreamProvider(maxConnTime time.Duration) StreamProvider
- func NewMobilePingStreamProvider(maxConnTime time.Duration) StreamProvider
- func NewServerSideFlagsOnlyStreamProvider(maxConnTime time.Duration) StreamProvider
- func NewServerSideStreamProvider(maxConnTime time.Duration) StreamProvider
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MakePingEvent ¶
func MakePingEvent() eventsource.Event
MakePingEvent creates a "ping" event for client-side SDKs.
func MakeServerSideDeleteEvent ¶
func MakeServerSideDeleteEvent(kind ldstoretypes.DataKind, key string, version int) eventsource.Event
MakeServerSideDeleteEvent creates a "delete" event for server-side SDKs.
func MakeServerSideFlagsOnlyDeleteEvent ¶
func MakeServerSideFlagsOnlyDeleteEvent(key string, version int) eventsource.Event
MakeServerSideFlagsOnlyDeleteEvent creates a "delete" event for old server-side SDKs that use the flags-only stream.
func MakeServerSideFlagsOnlyPatchEvent ¶
func MakeServerSideFlagsOnlyPatchEvent(key string, item ldstoretypes.ItemDescriptor) eventsource.Event
MakeServerSideFlagsOnlyPatchEvent creates a "patch" event for old server-side SDKs that use the flags-only stream.
func MakeServerSideFlagsOnlyPutEvent ¶
func MakeServerSideFlagsOnlyPutEvent(allData []ldstoretypes.Collection) eventsource.Event
MakeServerSideFlagsOnlyPutEvent creates a "put" event for old server-side SDKs that use the flags-only stream.
func MakeServerSidePatchEvent ¶
func MakeServerSidePatchEvent( kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor, ) eventsource.Event
MakeServerSidePatchEvent creates a "patch" event for server-side SDKs.
func MakeServerSidePutEvent ¶
func MakeServerSidePutEvent(allData []ldstoretypes.Collection) eventsource.Event
MakeServerSidePutEvent creates a "put" event for server-side SDKs.
Types ¶
type EnvStoreQueries ¶
type EnvStoreQueries interface { IsInitialized() bool GetAll(ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) }
EnvStoreQueries is a subset of DataStore methods that are used by EnvStreams to query existing data from the store, for generating "put" events.
type EnvStreamProvider ¶
type EnvStreamProvider interface { EnvStreamUpdates // SendAllDataUpdate, SendSingleItemUpdate // SendHeartbeat sends keep-alive data on the stream. SendHeartbeat() // Close releases all resources for this EnvStreamProvider and closes all connections to it. Close() }
EnvStreamProvider is an abstraction of publishing events to a stream for a specific environment. Implementations of this interface are created by StreamProvider.Register().
type EnvStreamUpdates ¶
type EnvStreamUpdates interface { SendAllDataUpdate(allData []ldstoretypes.Collection) SendSingleItemUpdate(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor) }
EnvStreamUpdates is an interface representing the kinds of updates we can publish to streams. Other components that publish updates to EnvStreams should use this interface rather than the implementation type, both to clarify that they don't need other EnvStreams functionality and to simplify testing.
type EnvStreams ¶
type EnvStreams struct {
// contains filtered or unexported fields
}
EnvStreams encapsulates streaming behavior for a specific environment.
EnvStreams itself does not know anything about what kind of streams are available; those are determined by the StreamProvider instances that are passed in the constructor, and the credentials that are registered with AddCredential. For each combination of a credential and a StreamProvider that can handle that credential, a stream is available, and data updates that are sent with the EnvStreamUpdates methods will be rebroadcast to all of those streams, in a format that is determined by each StreamProvider.
func NewEnvStreams ¶
func NewEnvStreams( streamProviders []StreamProvider, storeQueries EnvStoreQueries, heartbeatInterval time.Duration, loggers ldlog.Loggers, ) *EnvStreams
NewEnvStreams creates an instance of EnvStreams.
func (*EnvStreams) AddCredential ¶
func (es *EnvStreams) AddCredential(credential config.SDKCredential)
AddCredential adds an environment credential and creates a corresponding EnvStreamProvider.
func (*EnvStreams) Close ¶
func (es *EnvStreams) Close() error
Close shuts down all currently active streams for this environment and releases its resources.
func (*EnvStreams) RemoveCredential ¶
func (es *EnvStreams) RemoveCredential(credential config.SDKCredential)
RemoveCredential shuts down the EnvStreamProvider, if any, for the specified credential.
func (*EnvStreams) SendAllDataUpdate ¶
func (es *EnvStreams) SendAllDataUpdate( allData []ldstoretypes.Collection, )
SendAllDataUpdate sends all appropriate stream updates for when the full data set has been refreshed.
func (*EnvStreams) SendSingleItemUpdate ¶
func (es *EnvStreams) SendSingleItemUpdate( kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor, )
SendSingleItemUpdate sends all appropriate stream updates for when an individual item has been updated.
type StreamProvider ¶
type StreamProvider interface { // Handler returns an HTTP request handler for the given credential. It can return nil if it does // not support this type of credential. Handler(credential config.SDKCredential) http.HandlerFunc // Register tells the StreamProvider about an environment that it should support, and returns an // implementation of EnvStreamsUpdates for pushing updates related to that environment. It can // return nil if it does not support this type of credential. Register(credential config.SDKCredential, store EnvStoreQueries, loggers ldlog.Loggers) EnvStreamProvider // Close tells the StreamProvider to release all of its resources and close all connections. Close() }
StreamProvider is an abstraction of a specific kind of SSE event stream, such as the server-side SDK "/all" stream. The streams package provides default implementations of this interface for the streams that are supported by the standard Relay Proxy.
Each StreamProvider expects a specific kind of SDKCredential, e.g. config.SDKKey for the server-side streams. If the wrong kind of credential is passed, it should behave as it would for an unrecognized key. It is important that there can be more than one StreamProvider for a given credential.
func NewJSClientPingStreamProvider ¶
func NewJSClientPingStreamProvider(maxConnTime time.Duration) StreamProvider
NewJSClientPingStreamProvider creates a StreamProvider implementation for JS client-side streaming endpoints, which will generate only "ping" events.
This is identical to NewMobilePingStreamProvider except that it only handles requests authenticated with an environment ID.
func NewMobilePingStreamProvider ¶
func NewMobilePingStreamProvider(maxConnTime time.Duration) StreamProvider
NewMobilePingStreamProvider creates a StreamProvider implementation for mobile streaming endpoints, which will generate only "ping" events.
This is identical to NewJSClientPingStreamProvider except that it only handles requests authenticated with a mobile key.
func NewServerSideFlagsOnlyStreamProvider ¶
func NewServerSideFlagsOnlyStreamProvider(maxConnTime time.Duration) StreamProvider
NewServerSideFlagsOnlyStreamProvider creates a StreamProvider implementation for the server-side SDK "/flags" endpoint, which is used only by old SDKs that do not support segments.
func NewServerSideStreamProvider ¶
func NewServerSideStreamProvider(maxConnTime time.Duration) StreamProvider
NewServerSideStreamProvider creates a StreamProvider implementation for the server-side SDK "/all" endpoint, which is used by all current server-side SDK versions.