streams

package
v6.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package streams defines the Relay core abstractions for implementing streaming endpoints, and provides some standard implementations for those endpoints.

This package is in core rather than internal because Relay distributions that define any new streaming behaviors will need to use this package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MakePingEvent

func MakePingEvent() eventsource.Event

MakePingEvent creates a "ping" event for client-side SDKs.

func MakeServerSideDeleteEvent

func MakeServerSideDeleteEvent(kind ldstoretypes.DataKind, key string, version int) eventsource.Event

MakeServerSideDeleteEvent creates a "delete" event for server-side SDKs.

func MakeServerSideFlagsOnlyDeleteEvent

func MakeServerSideFlagsOnlyDeleteEvent(key string, version int) eventsource.Event

MakeServerSideFlagsOnlyDeleteEvent creates a "delete" event for old server-side SDKs that use the flags-only stream.

func MakeServerSideFlagsOnlyPatchEvent

func MakeServerSideFlagsOnlyPatchEvent(key string, item ldstoretypes.ItemDescriptor) eventsource.Event

MakeServerSideFlagsOnlyPatchEvent creates a "patch" event for old server-side SDKs that use the flags-only stream.

func MakeServerSideFlagsOnlyPutEvent

func MakeServerSideFlagsOnlyPutEvent(allData []ldstoretypes.Collection) eventsource.Event

MakeServerSideFlagsOnlyPutEvent creates a "put" event for old server-side SDKs that use the flags-only stream.

func MakeServerSidePatchEvent

func MakeServerSidePatchEvent(
	kind ldstoretypes.DataKind,
	key string,
	item ldstoretypes.ItemDescriptor,
) eventsource.Event

MakeServerSidePatchEvent creates a "patch" event for server-side SDKs.

func MakeServerSidePutEvent

func MakeServerSidePutEvent(allData []ldstoretypes.Collection) eventsource.Event

MakeServerSidePutEvent creates a "put" event for server-side SDKs.

Types

type EnvStoreQueries

type EnvStoreQueries interface {
	IsInitialized() bool
	GetAll(ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error)
}

EnvStoreQueries is a subset of DataStore methods that are used by EnvStreams to query existing data from the store, for generating "put" events.

type EnvStreamProvider

type EnvStreamProvider interface {
	EnvStreamUpdates // SendAllDataUpdate, SendSingleItemUpdate

	// SendHeartbeat sends keep-alive data on the stream.
	SendHeartbeat()

	// Close releases all resources for this EnvStreamProvider and closes all connections to it.
	Close()
}

EnvStreamProvider is an abstraction of publishing events to a stream for a specific environment. Implementations of this interface are created by StreamProvider.Register().

type EnvStreamUpdates

type EnvStreamUpdates interface {
	SendAllDataUpdate(allData []ldstoretypes.Collection)
	SendSingleItemUpdate(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor)
}

EnvStreamUpdates is an interface representing the kinds of updates we can publish to streams. Other components that publish updates to EnvStreams should use this interface rather than the implementation type, both to clarify that they don't need other EnvStreams functionality and to simplify testing.

type EnvStreams

type EnvStreams struct {
	// contains filtered or unexported fields
}

EnvStreams encapsulates streaming behavior for a specific environment.

EnvStreams itself does not know anything about what kind of streams are available; those are determined by the StreamProvider instances that are passed in the constructor, and the credentials that are registered with AddCredential. For each combination of a credential and a StreamProvider that can handle that credential, a stream is available, and data updates that are sent with the EnvStreamUpdates methods will be rebroadcast to all of those streams, in a format that is determined by each StreamProvider.

func NewEnvStreams

func NewEnvStreams(
	streamProviders []StreamProvider,
	storeQueries EnvStoreQueries,
	heartbeatInterval time.Duration,
	loggers ldlog.Loggers,
) *EnvStreams

NewEnvStreams creates an instance of EnvStreams.

func (*EnvStreams) AddCredential

func (es *EnvStreams) AddCredential(credential config.SDKCredential)

AddCredential adds an environment credential and creates a corresponding EnvStreamProvider.

func (*EnvStreams) Close

func (es *EnvStreams) Close() error

Close shuts down all currently active streams for this environment and releases its resources.

func (*EnvStreams) RemoveCredential

func (es *EnvStreams) RemoveCredential(credential config.SDKCredential)

RemoveCredential shuts down the EnvStreamProvider, if any, for the specified credential.

func (*EnvStreams) SendAllDataUpdate

func (es *EnvStreams) SendAllDataUpdate(
	allData []ldstoretypes.Collection,
)

SendAllDataUpdate sends all appropriate stream updates for when the full data set has been refreshed.

func (*EnvStreams) SendSingleItemUpdate

func (es *EnvStreams) SendSingleItemUpdate(
	kind ldstoretypes.DataKind,
	key string,
	item ldstoretypes.ItemDescriptor,
)

SendSingleItemUpdate sends all appropriate stream updates for when an individual item has been updated.

type StreamProvider

type StreamProvider interface {
	// Handler returns an HTTP request handler for the given credential. It can return nil if it does
	// not support this type of credential.
	Handler(credential config.SDKCredential) http.HandlerFunc

	// Register tells the StreamProvider about an environment that it should support, and returns an
	// implementation of EnvStreamsUpdates for pushing updates related to that environment. It can
	// return nil if it does not support this type of credential.
	Register(credential config.SDKCredential, store EnvStoreQueries, loggers ldlog.Loggers) EnvStreamProvider

	// Close tells the StreamProvider to release all of its resources and close all connections.
	Close()
}

StreamProvider is an abstraction of a specific kind of SSE event stream, such as the server-side SDK "/all" stream. The streams package provides default implementations of this interface for the streams that are supported by the standard Relay Proxy.

Each StreamProvider expects a specific kind of SDKCredential, e.g. config.SDKKey for the server-side streams. If the wrong kind of credential is passed, it should behave as it would for an unrecognized key. It is important that there can be more than one StreamProvider for a given credential.

func NewJSClientPingStreamProvider

func NewJSClientPingStreamProvider(maxConnTime time.Duration) StreamProvider

NewJSClientPingStreamProvider creates a StreamProvider implementation for JS client-side streaming endpoints, which will generate only "ping" events.

This is identical to NewMobilePingStreamProvider except that it only handles requests authenticated with an environment ID.

func NewMobilePingStreamProvider

func NewMobilePingStreamProvider(maxConnTime time.Duration) StreamProvider

NewMobilePingStreamProvider creates a StreamProvider implementation for mobile streaming endpoints, which will generate only "ping" events.

This is identical to NewJSClientPingStreamProvider except that it only handles requests authenticated with a mobile key.

func NewServerSideFlagsOnlyStreamProvider

func NewServerSideFlagsOnlyStreamProvider(maxConnTime time.Duration) StreamProvider

NewServerSideFlagsOnlyStreamProvider creates a StreamProvider implementation for the server-side SDK "/flags" endpoint, which is used only by old SDKs that do not support segments.

func NewServerSideStreamProvider

func NewServerSideStreamProvider(maxConnTime time.Duration) StreamProvider

NewServerSideStreamProvider creates a StreamProvider implementation for the server-side SDK "/all" endpoint, which is used by all current server-side SDK versions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL