streaming

package
v0.47.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

README

State Streaming Service

This package contains the constructors for the StreamingServices used to write state changes out from individual KVStores to a file or stream, as described in ADR-038 and defined in types/streaming.go. The child directories contain the implementations for specific output destinations.

Currently, a StreamingService implementation that writes state changes out to files is supported, in the future support for additional output destinations can be added.

The StreamingService is configured from within an App using the AppOptions loaded from the app.toml file:

# ...

[store]
# streaming is enabled if one or more streamers are defined
streamers = [
    # name of the streaming service, used by constructor
    "file"
]

[streamers]
[streamers.file]
    keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
    write_dir = "path to the write directory"
    prefix = "optional prefix to prepend to the generated file names"

The store.streamers field contains a list of the names of the StreamingService implementations to employ which are used by ServiceTypeFromString to return the ServiceConstructor for that particular implementation:

listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
for _, listenerName := range listeners {
    constructor, err := ServiceTypeFromString(listenerName)
    if err != nil {
    	// handle error
    }
}

The streamers field contains a mapping of the specific StreamingService implementation name to the configuration parameters for that specific service.

The streamers.x.keys field contains the list of StoreKey names for the KVStores to expose using this service and is required by every type of StreamingService. In order to expose ALL KVStores, we can include * in this list. An empty list is equivalent to turning the service off.

Additional configuration parameters are optional and specific to the implementation. In the case of the file streaming service, the streamers.file.write_dir field contains the path to the directory to write the files to, and streamers.file.prefix contains an optional prefix to prepend to the output files to prevent potential collisions with other App StreamingService output files.

The ServiceConstructor accepts AppOptions, the store keys collected using streamers.x.keys, a BinaryMarshaller and returns a `StreamingService implementation.

The AppOptions are passed in to provide access to any implementation specific configuration options, e.g. in the case of the file streaming service the streamers.file.write_dir and streamers.file.prefix.

streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
    // handler error
}

The returned StreamingService is loaded into the BaseApp using the BaseApp's SetStreamingService method.

The Stream method is called on the service to begin the streaming process. Depending on the implementation this process may be synchronous or asynchronous with the message processing of the state machine.

bApp.SetStreamingService(streamingService)
wg := new(sync.WaitGroup)
quitChan := make(chan struct{})
streamingService.Stream(wg, quitChan)

Documentation

Index

Constants

View Source
const (
	OptStreamersFilePrefix          = "streamers.file.prefix"
	OptStreamersFileWriteDir        = "streamers.file.write_dir"
	OptStreamersFileOutputMetadata  = "streamers.file.output-metadata"
	OptStreamersFileStopNodeOnError = "streamers.file.stop-node-on-error"
	OptStreamersFileFsync           = "streamers.file.fsync"

	OptStoreStreamers = "store.streamers"
)

Streaming option keys

Variables

View Source
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
	File: NewFileStreamingService,
}

ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors types.

Functions

func LoadStreamingServices

func LoadStreamingServices(
	bApp *baseapp.BaseApp,
	appOpts servertypes.AppOptions,
	appCodec codec.BinaryCodec,
	logger log.Logger,
	keys map[string]*types.KVStoreKey,
) ([]baseapp.StreamingService, *sync.WaitGroup, error)

LoadStreamingServices is a function for loading StreamingServices onto the BaseApp using the provided AppOptions, codec, and keys. It returns the WaitGroup and quit channel used to synchronize with the streaming services and any error that occurs during the setup.

func NewFileStreamingService

func NewFileStreamingService(
	opts servertypes.AppOptions,
	keys []types.StoreKey,
	marshaller codec.BinaryCodec,
	logger log.Logger,
) (baseapp.StreamingService, error)

NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService.

Types

type ServiceConstructor

ServiceConstructor is used to construct a streaming service

func NewServiceConstructor

func NewServiceConstructor(name string) (ServiceConstructor, error)

NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name.

type ServiceType

type ServiceType int

ServiceType enum for specifying the type of StreamingService

const (
	Unknown ServiceType = iota
	File
)

func ServiceTypeFromString

func ServiceTypeFromString(name string) ServiceType

ServiceTypeFromString returns the streaming.ServiceType corresponding to the provided name.

func (ServiceType) String

func (sst ServiceType) String() string

String returns the string name of a streaming.ServiceType

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL