file

package
v0.0.0-...-a24c2e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

README

File Streaming Service

This pkg contains an implementation of the StreamingService that writes the data stream out to files on the local filesystem. This process is performed synchronously with the message processing of the state machine.

Configuration

The file.StreamingService is configured from within an App using the AppOptions loaded from the app.toml file:

[store]
    streamers = [ # if len(streamers) > 0 we are streaming
        "file", # name of the streaming service, used by constructor
    ]

[streamers]
    [streamers.file]
        keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
        write_dir = "path to the write directory"
        prefix = "optional prefix to prepend to the generated file names"

We turn the service on by adding its name, "file", to store.streamers- the list of streaming services for this App to employ.

In streamers.file we include three configuration parameters for the file streaming service:

  1. streamers.file.keys contains the list of StoreKey names for the KVStores to expose using this service. In order to expose all KVStores, we can include * in this list. An empty list is equivalent to turning the service off.
  2. streamers.file.write_dir contains the path to the directory to write the files to.
  3. streamers.file.prefix contains an optional prefix to prepend to the output files to prevent potential collisions with other App StreamingService output files.
  4. streamers.file.output-metadata specifies if output the metadata file, otherwise only data file is outputted.
  5. streamers.file.stop-node-on-error specifies if propagate the error to consensus state machine, it's nesserary for data integrity when node restarts.
  6. streamers.file.fsync specifies if call fsync after writing the files, it's nesserary for data integrity when system crash, but slows down the commit time.
Encoding

For each block, two files are created and names block-{N}-meta and block-{N}-data, where N is the block number.

The meta file contains the protobuf encoded message BlockMetadata which contains the abci event requests and responses of the block:

message BlockMetadata {
    message DeliverTx {
        tendermint.abci.RequestDeliverTx request = 1;
        tendermint.abci.ResponseDeliverTx response = 2;
    }
    tendermint.abci.RequestBeginBlock request_begin_block = 1;
    tendermint.abci.ResponseBeginBlock response_begin_block = 2;
    repeated DeliverTx deliver_txs = 3;
    tendermint.abci.RequestEndBlock request_end_block = 4;
    tendermint.abci.ResponseEndBlock response_end_block = 5;
    tendermint.abci.ResponseCommit response_commit = 6;
}

The data file contains a series of length-prefixed protobuf encoded StoreKVPairs representing Set and Delete operations within the KVStores during the execution of block.

Both meta and data files are prefixed with the length of the data content for consumer to detect completeness of the file, the length is encoded as 8 bytes with big endianness.

The files are written at abci commit event, by default the error happens will be propagated to interuppted consensus state machine, but fsync is not called, it'll have good performance but have the risk of lossing data in face of rare event of system crash.

Decoding

The pseudo-code for decoding is like this:

def decode_meta_file(file):
  bz = file.read(8)
  if len(bz) < 8:
    raise "incomplete file exception"
  size = int.from_bytes(bz, 'big')

  if file.size != size + 8:
    raise "incomplete file exception"

  return decode_protobuf_message(BlockMetadata, file)

def decode_data_file(file):
  bz = file.read(8)
  if len(bz) < 8:
    raise "incomplete file exception"
  size = int.from_bytes(bz, 'big')

  if file.size != size + 8:
    raise "incomplete file exception"

  while not file.eof():
    yield decode_length_prefixed_protobuf_message(StoreKVStore, file)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type StreamingService

type StreamingService struct {
	// contains filtered or unexported fields
}

StreamingService is a concrete implementation of StreamingService that writes state changes out to files.

func NewStreamingService

func NewStreamingService(
	writeDir, filePrefix string,
	storeKeys []types.StoreKey,
	cdc types.Codec,
	logger log.Logger,
	outputMetadata, stopNodeOnErr, fsync bool,
) (*StreamingService, error)

func (*StreamingService) Close

func (fss *StreamingService) Close() error

Close satisfies the StreamingService interface. It performs a no-op.

func (*StreamingService) ListenBeginBlock

func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error

ListenBeginBlock satisfies the ABCIListener interface. It sets the received BeginBlock request, response and the current block number. Note, these are not written to file until ListenCommit is executed and outputMetadata is set, after which it will be reset again on the next block.

func (*StreamingService) ListenCommit

func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error

ListenCommit satisfies the ABCIListener interface. It is executed during the ABCI Commit request and is responsible for writing all staged data to files. It will only return a non-nil error when stopNodeOnErr is set.

func (*StreamingService) ListenDeliverTx

func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error

ListenDeliverTx satisfies the ABCIListener interface. It appends the received DeliverTx request and response to a list of DeliverTxs objects. Note, these are not written to file until ListenCommit is executed and outputMetadata is set, after which it will be reset again on the next block.

func (*StreamingService) ListenEndBlock

func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error

ListenEndBlock satisfies the ABCIListener interface. It sets the received EndBlock request, response and the current block number. Note, these are not written to file until ListenCommit is executed and outputMetadata is set, after which it will be reset again on the next block.

func (*StreamingService) Listeners

func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener

Listeners satisfies the StreamingService interface. It returns the StreamingService's underlying WriteListeners. Use for registering the underlying WriteListeners with the BaseApp.

func (*StreamingService) Stream

func (fss *StreamingService) Stream(wg *sync.WaitGroup) error

Stream satisfies the StreamingService interface. It performs a no-op.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL