sink

package module
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

README

Substreams Sink

This is a Substreams Sink library. You can use to build any sink application that consumes Substreams in Golang

Features

What you get by using this library:

  • Handles connection and reconnections
  • Throughput Logging (block rates, etc)
  • Best Practices error handling

Usage

The library provides a Sinker class that can be used to connect to the Substreams API. The Sinker class is a wrapper around the substreams library, which is a low-level library that provides a convenient way to connect to the Substreams API.

The user's primary responsibility when creating a custom sink is to pass a BlockScopedDataHandler and a BlockUndoSignalHandler implementation(s) which has the following interface:

impport (
	pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
)

type BlockScopedDataHandler = func(ctx context.Context, cursor *Cursor, data *pbsubstreamsrpc.BlockScopedData) error
type BlockUndoSignalHandler = func(ctx context.Context, cursor *Cursor, undoSignal *pbsubstreamsrpc.BlockUndoSignal) error

We invite you to take a look at our:

[!NOTE] We highly recommend to use the Advanced Example example for any serious sink implementation!

BlockScopedDataHandler
  • ctx context.Context is the sink.Sinker actual context.Context.
  • cursor *Cursor is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
  • data *pbsubstreamsrpc.BlockScopedData contains the data that was received from the Substreams API, refer to it's definition for proper usage.
BlockUndoSignalHandler
  • ctx context.Context is the sink.Sinker actual context.Context.
  • cursor *Cursor is the cursor to use after the undo, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
  • data *pbsubstreamsrpc.BlockUndoSignal contains the last valid block that is still valid, any data saved after this last saved block should be discarded.
Handlers Flow

The basic pattern for using the Sinker is as follows:

  • Create your data layer which is responsible for decoding the Substreams' data and saving it to the desired storage.
  • Have this object implement handling of *pbsubstreamsrpc.BlockScopedData message.
  • Have this object implement handling of *pbsubstreamsrpc.BlockUndoSignal message.
  • Create a Sinker object using sink.New and pass in the two handlers that calls your implementations.

The BlockScopedDataHandler is called for each block scoped data message that is received from the Substreams API and contains all the data output for the given Substreams module. In your handler, you are responsible for decoding and processing the data, and returning an error if there is a problem. It's there also that you should persist the cursor according to your persistence logic.

The BlockUndoSignalHandler is called when a message *pbsubstreamsrpc.BlockUndoSignal is received from the stream. Those message contains a LastValidBlock which points to the last block should be assumed to be part of the canonical chain as well as LastValidCursor which should be used as the current active cursor.

How is the *pbsubstreamsrpc.BlockUndoSignal is actually is implementation dependent. The correct behavior is to treat every piece of data that is contained in a BlockScopedData whose BlockScopedData.Clock.Number is > LastValidBlock.Number as now invalid. For example, if all written entities have a block number, one handling possibility is to delete every entities where blockNumber > LastValidBlock.Number.

From Viper

Our sink(s) are all using Viper/Cobra and a set of public StreamingFast libraries to deal with flags, logging, environment variables, etc. If you use the same structure as us, you can benefits from sink.AddFlagsToSet and sink.NewFromViper which both performs most of the boilerplate to bootstrap a sinker from flags.

Accepted Block Range

Spec is loosely (assuming a module's start block of 5):

  • <empty> => [5, ∞+
  • -1 => [5, ∞+
  • : => [5, ∞+
  • 10 => [5, 10(
  • +10 => [5, 15(
  • :+10 => [5, 15(
  • +10: => [15, ∞+
  • 10:15 => [10, 15(
  • 10:+10 => [10, 20(
  • +10:+10 => [15, 25(

Launching

The sinker can be launched by calling the Start method on the Sinker object. The Start method will block until the sinker is stopped.

The sinker implements the shutter interface which can be used to handle all shutdown logic (eg: flushing any remaining data to storage, stopping the sink in case of database disconnection, etc.)

Example uses

The following repositories are examples of how the sink library can be used:

Documentation

Index

Constants

View Source
const (
	FlagNetwork               = "network"
	FlagParams                = "params"
	FlagInsecure              = "insecure"
	FlagPlaintext             = "plaintext"
	FlagUndoBufferSize        = "undo-buffer-size"
	FlagLiveBlockTimeDelta    = "live-block-time-delta"
	FlagDevelopmentMode       = "development-mode"
	FlagFinalBlocksOnly       = "final-blocks-only"
	FlagInfiniteRetry         = "infinite-retry"
	FlagIrreversibleOnly      = "irreversible-only"
	FlagSkipPackageValidation = "skip-package-validation"
	FlagExtraHeaders          = "header"
	FlagAPIKeyEnvvar          = "api-key-envvar"
	FlagAPITokenEnvvar        = "api-token-envvar"
	FlagNoopMode              = "noop-mode"
)
View Source
const IgnoreOutputModuleType string = "@!##_IgnoreOutputModuleType_##!@"

IgnoreOutputModuleType can be used instead of the expected output module type when you want to validate this yourself, for example if you accept multiple output type(s).

View Source
const InferOutputModuleFromPackage string = "@!##_InferOutputModuleFromSpkg_##!@"

InferOutputModuleFromPackage can be used instead of the actual module's output name and has the effect that output module is extracted directly from the pbsubstreams.Package via the `SinkModule` field.

Variables

View Source
var BackprocessingCompletion = metrics.NewGauge("substreams_sink_backprocessing_completion", "Determines if backprocessing is completed, which is if we receive a first data message")
View Source
var DataMessageCount = metrics.NewCounter("substreams_sink_data_message", "The number of data message received")
View Source
var DataMessageSizeBytes = metrics.NewCounter("substreams_sink_data_message_size_bytes", "The total size of in bytes of all data message received")
View Source
var ErrBackOffExpired = errors.New("unable to complete work within backoff time limit")
View Source
var HeadBlockNumber = metrics.NewHeadBlockNumber("substreams_sink")
View Source
var HeadBlockTimeDrift = metrics.NewHeadTimeDrift("substreams_sink")
View Source
var MessageSizeBytes = metrics.NewCounter("substreams_sink_message_size_bytes", "The number of total bytes of message received from the Substreams backend")
View Source
var ProgressMessageCount = metrics.NewGauge("substreams_sink_progress_message", "The number of progress message received")
View Source
var ProgressMessageLastBlock = metrics.NewGaugeVec("substreams_sink_progress_message_last_block", []string{"stage"}, "Latest progress reported processed range end block for each stage (not necessarily contiguous)")
View Source
var ProgressMessageLastContiguousBlock = metrics.NewGaugeVec("substreams_sink_progress_message_last_contiguous_block", []string{"stage"}, "Latest progress reported processed end block for the first completed (contiguous) range")
View Source
var ProgressMessageRunningJobs = metrics.NewGaugeVec("substreams_sink_progress_message_running_jobs", []string{"stage"}, "Latest reported number of active jobs for each stage")
View Source
var ProgressMessageTotalProcessedBlocks = metrics.NewGauge("substreams_sink_progress_message_total_processed_blocks", "Latest progress reported total processed blocks (including cached blocks from previous runs)")
View Source
var SubstreamsErrorCount = metrics.NewCounter("substreams_sink_error", "The error count we encountered when interacting with Substreams for which we had to restart the connection loop")
View Source
var UndoMessageCount = metrics.NewCounter("substreams_sink_undo_message", "The number of block undo message received")
View Source
var UnknownMessageCount = metrics.NewCounter("substreams_sink_unknown_message", "The number of unknown message received")

Functions

func AddFlagsToSet

func AddFlagsToSet(flags *pflag.FlagSet, ignore ...FlagIgnored)

AddFlagsToSet can be used to import standard flags needed for sink to configure itself. By using this method to define your flag and using `cli.ConfigureViper` (import "github.com/streamingfast/cli") in your main application command, `NewFromViper` is usable to easily create a `sink.Sinker` instance.

Defines

Flag `--params` (-p) (defaults `[]`)
Flag `--insecure` (-k) (defaults `false`)
Flag `--plaintext` (defaults `false`)
Flag `--undo-buffer-size` (defaults `12`)
Flag `--live-block-time-delta` (defaults `300*time.Second`)
Flag `--development-mode` (defaults `false`)
Flag `--final-blocks-only` (defaults `false`)
Flag `--infinite-retry` (defaults `false`)
Flag `--skip-package-validation` (defaults `false`)
Flag `--header (-H)` (defaults `[]`)
Flag `--api-key-envvar` (default `SUBSTREAMS_API_KEY`)
Flag `--api-token-envvar` (default `SUBSTREAMS_API_TOKEN`)

The `ignore` field can be used to multiple times to avoid adding the specified `flags` to the the set. This can be used for example to avoid adding `--final-blocks-only` when the sink is always final only.

AddFlagsToSet(flags, sink.FlagIgnore(sink.FlagFinalBlocksOnly))

func ReadBlockRange

func ReadBlockRange(module *pbsubstreams.Module, input string) (*bstream.Range, error)

ReadBlockRange parses a block range string and returns a bstream.Range out of it using the model to resolve relative block numbers to absolute block numbers.

The block range string is of the form:

[<before>]:[<after>]

Where before and after are block numbers. If before is empty, it is resolve to the module's start block. If after is empty it means stream forever. If after is empty and before is empty, the range is the entire chain.

If before or after is prefixed with a +, it is relative to the module's start block.

func ReadManifestAndModule

func ReadManifestAndModule(
	manifestPath string,
	network string,
	paramsStrings []string,
	outputModuleName string,
	expectedOutputModuleType string,
	skipPackageValidation bool,
	zlog *zap.Logger,
) (
	pkg *pbsubstreams.Package,
	module *pbsubstreams.Module,
	outputModuleHash manifest.ModuleHash,
	err error,
)

ReadManifestAndModule reads the manifest and returns the package, the output module and its hash.

If outputModuleName is set to InferOutputModuleFromPackage, the sink will try to infer the output module from the package's sink_module field, if present.

If expectedOutputModuleType is set to IgnoreOutputModuleType, the sink will not validate the output module type.

If skipPackageValidation is set to true, the sink will not validate the package, you will have to do it yourself.

func ReadManifestAndModuleAndBlockRange

func ReadManifestAndModuleAndBlockRange(
	manifestPath string,
	network string,
	params []string,
	outputModuleName string,
	expectedOutputModuleType string,
	skipPackageValidation bool,
	blockRange string,
	zlog *zap.Logger,
) (
	pkg *pbsubstreams.Package,
	module *pbsubstreams.Module,
	outputModuleHash manifest.ModuleHash,
	resolvedBlockRange *bstream.Range,
	err error,
)

ReadManifestAndModuleAndBlockRange acts exactly like ReadManifestAndModule but also reads the block range.

func RegisterMetrics

func RegisterMetrics()

func SubstreamsModeNames

func SubstreamsModeNames() []string

SubstreamsModeNames returns a list of possible string values of SubstreamsMode.

Types

type BackOffStringer

type BackOffStringer struct{ backoff.BackOff }

func (BackOffStringer) String

func (s BackOffStringer) String() string

type Cursor

type Cursor struct {
	*bstream.Cursor
}

func MustNewCursor

func MustNewCursor(cursor string) *Cursor

func NewBlankCursor

func NewBlankCursor() *Cursor

func NewCursor

func NewCursor(cursor string) (*Cursor, error)

func (*Cursor) Block

func (c *Cursor) Block() bstream.BlockRef

func (*Cursor) IsBlank

func (c *Cursor) IsBlank() bool

func (*Cursor) IsEqualTo

func (c *Cursor) IsEqualTo(other *Cursor) bool

func (*Cursor) MarshalLogObject

func (c *Cursor) MarshalLogObject(encoder zapcore.ObjectEncoder) error

func (*Cursor) String

func (c *Cursor) String() string

String returns a string representation suitable for handling a Firehose request meaning a blank cursor returns "".

type DeltaLivenessChecker

type DeltaLivenessChecker struct {
	// contains filtered or unexported fields
}

func NewDeltaLivenessChecker

func NewDeltaLivenessChecker(delta time.Duration) *DeltaLivenessChecker

func (*DeltaLivenessChecker) IsLive

func (t *DeltaLivenessChecker) IsLive(clock *pbsubstreams.Clock) bool

type FlagIgnored

type FlagIgnored interface {
	IsIgnored(flag string) bool
}

func FlagIgnore

func FlagIgnore(in ...string) FlagIgnored

type LivenessChecker

type LivenessChecker interface {
	IsLive(block *pbsubstreams.Clock) bool
}

type Option

type Option func(s *Sinker)

func WithBlockDataBuffer

func WithBlockDataBuffer(bufferSize int) Option

WithBlockDataBuffer creates a buffer of block data which is used to handle undo fork steps.

Ensure that this buffer is large enough to capture all block reorganizations. If the buffer is too small, the sinker will not be able to handle the reorganization and will error if an undo is received for a block which has already been returned to the sink. If the buffer is too large, the sinker will take more time than necessary to write data to the sink.

If the sink is configured to handle irreversible blocks, the default buffer size is 12. If you pass 0, block data buffer will be disabled completely.

func WithBlockRange

func WithBlockRange(blockRange *bstream.Range) Option

WithBlockRange configures the Sinker instance to only stream for the range specified. If there is no range specified on the Sinker, the Sinker is going to sink automatically from module's start block to live never ending.

func WithExtraHeaders

func WithExtraHeaders(headers []string) Option

WithExtraHeaders configures the Sinker instance to send extra headers to the Substreams backend server.

func WithFinalBlocksOnly

func WithFinalBlocksOnly() Option

WithFinalBlocksOnly configures the Sinker to only stream Substreams output that is considered final by the Substreams backend server.

This means that `WithBlockDataBuffer` if used is discarded and [BlockUndoSignalHandler] will never be called.

func WithInfiniteRetry

func WithInfiniteRetry() Option

WithInfiniteRetry remove the maximum retry limit of 15 (hard-coded right now) which spans approximatively 5m so that retry is perform indefinitely without never exiting the process.

func WithLivenessChecker

func WithLivenessChecker(livenessChecker LivenessChecker) Option

WithLivenessChecker configures a [LivnessCheck] on the Sinker instance.

By configuring a liveness checker, the [MessageContext] received by [BlockScopedDataHandler] and [BlockUndoSignalHandler] will have the field [MessageContext.IsLive] properly populated.

func WithRetryBackOff

func WithRetryBackOff(backOff backoff.BackOff) Option

WithRetryBackoff configures the Sinker to which itself configurs the Substreams gRPC stream to only send pbsubstreamsrpc.BlockScopedData once the block is final, this means that `WithBlockDataBuffer` if used has is discarded and [BlockUndoSignalHandler] will never be called.

type Sinker

type Sinker struct {
	*shutter.Shutter

	NoopMode bool
	// contains filtered or unexported fields
}

func New

func New(
	mode SubstreamsMode,
	NoopMode bool,
	pkg *pbsubstreams.Package,
	outputModule *pbsubstreams.Module,
	hash manifest.ModuleHash,
	clientConfig *client.SubstreamsClientConfig,
	logger *zap.Logger,
	tracer logging.Tracer,
	opts ...Option,
) (*Sinker, error)

func NewFromViper

func NewFromViper(
	cmd *cobra.Command,
	expectedOutputModuleType string,
	endpoint, manifestPath, outputModuleName, blockRange string,
	zlog *zap.Logger,
	tracer logging.Tracer,
	opts ...Option,
) (*Sinker, error)

NewFromViper constructs a new Sinker instance from a fixed set of "known" flags.

If you want to extract the sink output module's name directly from the Substreams package, if supported by your sink, instead of an actual name for paramater `outputModuleNameArg`, use `sink.InferOutputModuleFromPackage`.

The `expectedOutputModuleType` should be the fully qualified expected Protobuf package.

The `manifestPath` can be left empty in which case we this method is going to look in the current directory for a `substreams.yaml` file. If the `manifestPath` is non-empty and points to a directory, we will look for a `substreams.yaml` file in that directory.

func (*Sinker) ApiToken

func (s *Sinker) ApiToken() string

ApiToken returns the currently defined ApiToken sets on this sinker instance, "" is no api token was configured

func (*Sinker) BlockRange

func (s *Sinker) BlockRange() *bstream.Range

func (*Sinker) ClientConfig

func (s *Sinker) ClientConfig() *client.SubstreamsClientConfig

ClientConfig returns the the `SubstreamsClientConfig`used by this sinker instance.

func (*Sinker) EndpointConfig

func (s *Sinker) EndpointConfig() (endpoint string, plaintext bool, insecure bool)

EndpointConfig returns the endpoint configuration used by this sinker instance, this is an extraction of the `SubstreamsClientConfig` used by this sinker instance.

func (*Sinker) OutputModule

func (s *Sinker) OutputModule() *pbsubstreams.Module

func (*Sinker) OutputModuleHash

func (s *Sinker) OutputModuleHash() string

OutputModuleHash returns the module output hash, can be used by consumer to warn if the module changed between restart of the process.

func (*Sinker) OutputModuleName

func (s *Sinker) OutputModuleName() string

func (*Sinker) OutputModuleTypePrefixed

func (s *Sinker) OutputModuleTypePrefixed() (prefixed string)

OutputModuleTypePrefixed returns the prefixed output module's type so the type will always be prefixed with "proto:".

func (*Sinker) OutputModuleTypeUnprefixed

func (s *Sinker) OutputModuleTypeUnprefixed() (unprefixed string)

OutputModuleTypeUnprefixed returns the unprefixed output module's type so the type will **never** be prefixed with "proto:".

func (*Sinker) Package

func (s *Sinker) Package() *pbsubstreams.Package

func (*Sinker) Run

func (s *Sinker) Run(ctx context.Context, cursor *Cursor, handler SinkerHandler)

type SinkerCompletionHandler

type SinkerCompletionHandler interface {
	// HandleBlockRangeCompletion is called when the sinker is done processing the requested range, only when
	// the stream has correctly reached its end block. If the sinker is configured to stream live, this callback
	// will never be called.
	//
	// If the sinker terminates with an error, this callback will not be called.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
	HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error
}

SinkerCompletionHandler defines an extra interface that can be implemented on top of `SinkerHandler` where the callback will be invoked when the sinker is done processing the requested range. This is useful to implement a checkpointing mechanism where when the range has correctly fully processed, you can do something meaningful.

type SinkerHandler

type SinkerHandler interface {
	HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error

	// HandleBlockUndoSignal defines the callback that will handle Substreams `BlockUndoSignal` messages.
	//
	// The handler receives the following arguments:
	// - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this.
	// - `undoSignal` contains the last valid block that is still valid, any data saved after this last saved block should be discarded.
	// - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted.
	//
	// The [HandleBlockUndoSignal] can be nil if the sinker is configured to stream final blocks only, otherwise it must be set,
	// the [Sinker] enforces this.
	//
	// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
	// error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify
	// the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the
	// the error.
	HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error
}

func NewSinkerHandlers

func NewSinkerHandlers(
	handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error,
	handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error,
) SinkerHandler

type Stats

type Stats struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func (*Stats) Close

func (s *Stats) Close()

func (*Stats) LogNow

func (s *Stats) LogNow()

func (*Stats) RecordBlock

func (s *Stats) RecordBlock(block bstream.BlockRef)

func (*Stats) Start

func (s *Stats) Start(each time.Duration)

type SubstreamsMode

type SubstreamsMode uint

ENUM(

Development
Production

)

const (
	// SubstreamsModeDevelopment is a SubstreamsMode of type Development.
	SubstreamsModeDevelopment SubstreamsMode = iota
	// SubstreamsModeProduction is a SubstreamsMode of type Production.
	SubstreamsModeProduction
)

func ParseSubstreamsMode

func ParseSubstreamsMode(name string) (SubstreamsMode, error)

ParseSubstreamsMode attempts to convert a string to a SubstreamsMode

func (SubstreamsMode) MarshalText

func (x SubstreamsMode) MarshalText() ([]byte, error)

MarshalText implements the text marshaller method

func (SubstreamsMode) String

func (x SubstreamsMode) String() string

String implements the Stringer interface.

func (*SubstreamsMode) UnmarshalText

func (x *SubstreamsMode) UnmarshalText(text []byte) error

UnmarshalText implements the text unmarshaller method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL