Documentation ¶
Index ¶
- Constants
- Variables
- func AddFlagsToSet(flags *pflag.FlagSet, ignore ...FlagIgnored)
- func ReadBlockRange(module *pbsubstreams.Module, input string) (*bstream.Range, error)
- func ReadManifestAndModule(manifestPath string, network string, paramsStrings []string, ...) (pkg *pbsubstreams.Package, module *pbsubstreams.Module, ...)
- func ReadManifestAndModuleAndBlockRange(manifestPath string, network string, params []string, outputModuleName string, ...) (pkg *pbsubstreams.Package, module *pbsubstreams.Module, ...)
- func RegisterMetrics()
- func SubstreamsModeNames() []string
- type BackOffStringer
- type Cursor
- type DeltaLivenessChecker
- type FlagIgnored
- type LivenessChecker
- type Option
- func WithBlockDataBuffer(bufferSize int) Option
- func WithBlockRange(blockRange *bstream.Range) Option
- func WithExtraHeaders(headers []string) Option
- func WithFinalBlocksOnly() Option
- func WithInfiniteRetry() Option
- func WithLivenessChecker(livenessChecker LivenessChecker) Option
- func WithRetryBackOff(backOff backoff.BackOff) Option
- type Sinker
- func (s *Sinker) ApiToken() string
- func (s *Sinker) BlockRange() *bstream.Range
- func (s *Sinker) ClientConfig() *client.SubstreamsClientConfig
- func (s *Sinker) EndpointConfig() (endpoint string, plaintext bool, insecure bool)
- func (s *Sinker) OutputModule() *pbsubstreams.Module
- func (s *Sinker) OutputModuleHash() string
- func (s *Sinker) OutputModuleName() string
- func (s *Sinker) OutputModuleTypePrefixed() (prefixed string)
- func (s *Sinker) OutputModuleTypeUnprefixed() (unprefixed string)
- func (s *Sinker) Package() *pbsubstreams.Package
- func (s *Sinker) Run(ctx context.Context, cursor *Cursor, handler SinkerHandler)
- type SinkerCompletionHandler
- type SinkerHandler
- type Stats
- type SubstreamsMode
Constants ¶
const ( FlagNetwork = "network" FlagParams = "params" FlagInsecure = "insecure" FlagPlaintext = "plaintext" FlagUndoBufferSize = "undo-buffer-size" FlagLiveBlockTimeDelta = "live-block-time-delta" FlagDevelopmentMode = "development-mode" FlagFinalBlocksOnly = "final-blocks-only" FlagInfiniteRetry = "infinite-retry" FlagIrreversibleOnly = "irreversible-only" FlagSkipPackageValidation = "skip-package-validation" FlagExtraHeaders = "header" FlagAPIKeyEnvvar = "api-key-envvar" FlagAPITokenEnvvar = "api-token-envvar" FlagNoopMode = "noop-mode" )
const IgnoreOutputModuleType string = "@!##_IgnoreOutputModuleType_##!@"
IgnoreOutputModuleType can be used instead of the expected output module type when you want to validate this yourself, for example if you accept multiple output type(s).
const InferOutputModuleFromPackage string = "@!##_InferOutputModuleFromSpkg_##!@"
InferOutputModuleFromPackage can be used instead of the actual module's output name and has the effect that output module is extracted directly from the pbsubstreams.Package via the `SinkModule` field.
Variables ¶
var BackprocessingCompletion = metrics.NewGauge("substreams_sink_backprocessing_completion", "Determines if backprocessing is completed, which is if we receive a first data message")
var DataMessageCount = metrics.NewCounter("substreams_sink_data_message", "The number of data message received")
var DataMessageSizeBytes = metrics.NewCounter("substreams_sink_data_message_size_bytes", "The total size of in bytes of all data message received")
var ErrBackOffExpired = errors.New("unable to complete work within backoff time limit")
var HeadBlockNumber = metrics.NewHeadBlockNumber("substreams_sink")
var HeadBlockTimeDrift = metrics.NewHeadTimeDrift("substreams_sink")
var MessageSizeBytes = metrics.NewCounter("substreams_sink_message_size_bytes", "The number of total bytes of message received from the Substreams backend")
var ProgressMessageCount = metrics.NewGauge("substreams_sink_progress_message", "The number of progress message received")
var ProgressMessageLastBlock = metrics.NewGaugeVec("substreams_sink_progress_message_last_block", []string{"stage"}, "Latest progress reported processed range end block for each stage (not necessarily contiguous)")
var ProgressMessageLastContiguousBlock = metrics.NewGaugeVec("substreams_sink_progress_message_last_contiguous_block", []string{"stage"}, "Latest progress reported processed end block for the first completed (contiguous) range")
var ProgressMessageRunningJobs = metrics.NewGaugeVec("substreams_sink_progress_message_running_jobs", []string{"stage"}, "Latest reported number of active jobs for each stage")
var ProgressMessageTotalProcessedBlocks = metrics.NewGauge("substreams_sink_progress_message_total_processed_blocks", "Latest progress reported total processed blocks (including cached blocks from previous runs)")
var SubstreamsErrorCount = metrics.NewCounter("substreams_sink_error", "The error count we encountered when interacting with Substreams for which we had to restart the connection loop")
var UndoMessageCount = metrics.NewCounter("substreams_sink_undo_message", "The number of block undo message received")
var UnknownMessageCount = metrics.NewCounter("substreams_sink_unknown_message", "The number of unknown message received")
Functions ¶
func AddFlagsToSet ¶
func AddFlagsToSet(flags *pflag.FlagSet, ignore ...FlagIgnored)
AddFlagsToSet can be used to import standard flags needed for sink to configure itself. By using this method to define your flag and using `cli.ConfigureViper` (import "github.com/streamingfast/cli") in your main application command, `NewFromViper` is usable to easily create a `sink.Sinker` instance.
Defines
Flag `--params` (-p) (defaults `[]`) Flag `--insecure` (-k) (defaults `false`) Flag `--plaintext` (defaults `false`) Flag `--undo-buffer-size` (defaults `12`) Flag `--live-block-time-delta` (defaults `300*time.Second`) Flag `--development-mode` (defaults `false`) Flag `--final-blocks-only` (defaults `false`) Flag `--infinite-retry` (defaults `false`) Flag `--skip-package-validation` (defaults `false`) Flag `--header (-H)` (defaults `[]`) Flag `--api-key-envvar` (default `SUBSTREAMS_API_KEY`) Flag `--api-token-envvar` (default `SUBSTREAMS_API_TOKEN`)
The `ignore` field can be used to multiple times to avoid adding the specified `flags` to the the set. This can be used for example to avoid adding `--final-blocks-only` when the sink is always final only.
AddFlagsToSet(flags, sink.FlagIgnore(sink.FlagFinalBlocksOnly))
func ReadBlockRange ¶
ReadBlockRange parses a block range string and returns a bstream.Range out of it using the model to resolve relative block numbers to absolute block numbers.
The block range string is of the form:
[<before>]:[<after>]
Where before and after are block numbers. If before is empty, it is resolve to the module's start block. If after is empty it means stream forever. If after is empty and before is empty, the range is the entire chain.
If before or after is prefixed with a +, it is relative to the module's start block.
func ReadManifestAndModule ¶
func ReadManifestAndModule( manifestPath string, network string, paramsStrings []string, outputModuleName string, expectedOutputModuleType string, skipPackageValidation bool, zlog *zap.Logger, ) ( pkg *pbsubstreams.Package, module *pbsubstreams.Module, outputModuleHash manifest.ModuleHash, err error, )
ReadManifestAndModule reads the manifest and returns the package, the output module and its hash.
If outputModuleName is set to InferOutputModuleFromPackage, the sink will try to infer the output module from the package's sink_module field, if present.
If expectedOutputModuleType is set to IgnoreOutputModuleType, the sink will not validate the output module type.
If skipPackageValidation is set to true, the sink will not validate the package, you will have to do it yourself.
func ReadManifestAndModuleAndBlockRange ¶
func ReadManifestAndModuleAndBlockRange( manifestPath string, network string, params []string, outputModuleName string, expectedOutputModuleType string, skipPackageValidation bool, blockRange string, zlog *zap.Logger, ) ( pkg *pbsubstreams.Package, module *pbsubstreams.Module, outputModuleHash manifest.ModuleHash, resolvedBlockRange *bstream.Range, err error, )
ReadManifestAndModuleAndBlockRange acts exactly like ReadManifestAndModule but also reads the block range.
func RegisterMetrics ¶
func RegisterMetrics()
func SubstreamsModeNames ¶
func SubstreamsModeNames() []string
SubstreamsModeNames returns a list of possible string values of SubstreamsMode.
Types ¶
type BackOffStringer ¶
type BackOffStringer struct{ backoff.BackOff }
func (BackOffStringer) String ¶
func (s BackOffStringer) String() string
type Cursor ¶
func MustNewCursor ¶
func NewBlankCursor ¶
func NewBlankCursor() *Cursor
func (*Cursor) MarshalLogObject ¶
func (c *Cursor) MarshalLogObject(encoder zapcore.ObjectEncoder) error
type DeltaLivenessChecker ¶
type DeltaLivenessChecker struct {
// contains filtered or unexported fields
}
func NewDeltaLivenessChecker ¶
func NewDeltaLivenessChecker(delta time.Duration) *DeltaLivenessChecker
func (*DeltaLivenessChecker) IsLive ¶
func (t *DeltaLivenessChecker) IsLive(clock *pbsubstreams.Clock) bool
type FlagIgnored ¶
func FlagIgnore ¶
func FlagIgnore(in ...string) FlagIgnored
type LivenessChecker ¶
type LivenessChecker interface {
IsLive(block *pbsubstreams.Clock) bool
}
type Option ¶
type Option func(s *Sinker)
func WithBlockDataBuffer ¶
WithBlockDataBuffer creates a buffer of block data which is used to handle undo fork steps.
Ensure that this buffer is large enough to capture all block reorganizations. If the buffer is too small, the sinker will not be able to handle the reorganization and will error if an undo is received for a block which has already been returned to the sink. If the buffer is too large, the sinker will take more time than necessary to write data to the sink.
If the sink is configured to handle irreversible blocks, the default buffer size is 12. If you pass 0, block data buffer will be disabled completely.
func WithBlockRange ¶
WithBlockRange configures the Sinker instance to only stream for the range specified. If there is no range specified on the Sinker, the Sinker is going to sink automatically from module's start block to live never ending.
func WithExtraHeaders ¶
WithExtraHeaders configures the Sinker instance to send extra headers to the Substreams backend server.
func WithFinalBlocksOnly ¶
func WithFinalBlocksOnly() Option
WithFinalBlocksOnly configures the Sinker to only stream Substreams output that is considered final by the Substreams backend server.
This means that `WithBlockDataBuffer` if used is discarded and [BlockUndoSignalHandler] will never be called.
func WithInfiniteRetry ¶
func WithInfiniteRetry() Option
WithInfiniteRetry remove the maximum retry limit of 15 (hard-coded right now) which spans approximatively 5m so that retry is perform indefinitely without never exiting the process.
func WithLivenessChecker ¶
func WithLivenessChecker(livenessChecker LivenessChecker) Option
WithLivenessChecker configures a [LivnessCheck] on the Sinker instance.
By configuring a liveness checker, the [MessageContext] received by [BlockScopedDataHandler] and [BlockUndoSignalHandler] will have the field [MessageContext.IsLive] properly populated.
func WithRetryBackOff ¶
func WithRetryBackOff(backOff backoff.BackOff) Option
WithRetryBackoff configures the Sinker to which itself configurs the Substreams gRPC stream to only send pbsubstreamsrpc.BlockScopedData once the block is final, this means that `WithBlockDataBuffer` if used has is discarded and [BlockUndoSignalHandler] will never be called.
type Sinker ¶
func New ¶
func New( mode SubstreamsMode, NoopMode bool, pkg *pbsubstreams.Package, outputModule *pbsubstreams.Module, hash manifest.ModuleHash, clientConfig *client.SubstreamsClientConfig, logger *zap.Logger, tracer logging.Tracer, opts ...Option, ) (*Sinker, error)
func NewFromViper ¶
func NewFromViper( cmd *cobra.Command, expectedOutputModuleType string, endpoint, manifestPath, outputModuleName, blockRange string, zlog *zap.Logger, tracer logging.Tracer, opts ...Option, ) (*Sinker, error)
NewFromViper constructs a new Sinker instance from a fixed set of "known" flags.
If you want to extract the sink output module's name directly from the Substreams package, if supported by your sink, instead of an actual name for paramater `outputModuleNameArg`, use `sink.InferOutputModuleFromPackage`.
The `expectedOutputModuleType` should be the fully qualified expected Protobuf package.
The `manifestPath` can be left empty in which case we this method is going to look in the current directory for a `substreams.yaml` file. If the `manifestPath` is non-empty and points to a directory, we will look for a `substreams.yaml` file in that directory.
func (*Sinker) ApiToken ¶
ApiToken returns the currently defined ApiToken sets on this sinker instance, "" is no api token was configured
func (*Sinker) BlockRange ¶
func (*Sinker) ClientConfig ¶
func (s *Sinker) ClientConfig() *client.SubstreamsClientConfig
ClientConfig returns the the `SubstreamsClientConfig`used by this sinker instance.
func (*Sinker) EndpointConfig ¶
EndpointConfig returns the endpoint configuration used by this sinker instance, this is an extraction of the `SubstreamsClientConfig` used by this sinker instance.
func (*Sinker) OutputModule ¶
func (s *Sinker) OutputModule() *pbsubstreams.Module
func (*Sinker) OutputModuleHash ¶
OutputModuleHash returns the module output hash, can be used by consumer to warn if the module changed between restart of the process.
func (*Sinker) OutputModuleName ¶
func (*Sinker) OutputModuleTypePrefixed ¶
OutputModuleTypePrefixed returns the prefixed output module's type so the type will always be prefixed with "proto:".
func (*Sinker) OutputModuleTypeUnprefixed ¶
OutputModuleTypeUnprefixed returns the unprefixed output module's type so the type will **never** be prefixed with "proto:".
func (*Sinker) Package ¶
func (s *Sinker) Package() *pbsubstreams.Package
type SinkerCompletionHandler ¶
type SinkerCompletionHandler interface { // HandleBlockRangeCompletion is called when the sinker is done processing the requested range, only when // the stream has correctly reached its end block. If the sinker is configured to stream live, this callback // will never be called. // // If the sinker terminates with an error, this callback will not be called. // // The handler receives the following arguments: // - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this. // - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted. HandleBlockRangeCompletion(ctx context.Context, cursor *Cursor) error }
SinkerCompletionHandler defines an extra interface that can be implemented on top of `SinkerHandler` where the callback will be invoked when the sinker is done processing the requested range. This is useful to implement a checkpointing mechanism where when the range has correctly fully processed, you can do something meaningful.
type SinkerHandler ¶
type SinkerHandler interface { HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error // HandleBlockUndoSignal defines the callback that will handle Substreams `BlockUndoSignal` messages. // // The handler receives the following arguments: // - `ctx` is the context runtime, your handler should be minimal, so normally you shouldn't use this. // - `undoSignal` contains the last valid block that is still valid, any data saved after this last saved block should be discarded. // - `cursor` is the cursor at the given block, this cursor should be saved regularly as a checkpoint in case the process is interrupted. // // The [HandleBlockUndoSignal] can be nil if the sinker is configured to stream final blocks only, otherwise it must be set, // the [Sinker] enforces this. // // Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal // error and the [Sinker] will not retry it. If the error is retryable, wrap it in `derr.NewRetryableError(err)` to notify // the [Sinker] that it should retry from last valid cursor. It's your responsibility to ensure no data was persisted prior the // the error. HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error }
func NewSinkerHandlers ¶
func NewSinkerHandlers( handleBlockScopedData func(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error, handleBlockUndoSignal func(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error, ) SinkerHandler
type Stats ¶
func (*Stats) RecordBlock ¶
type SubstreamsMode ¶
type SubstreamsMode uint
ENUM(
Development Production
)
const ( // SubstreamsModeDevelopment is a SubstreamsMode of type Development. SubstreamsModeDevelopment SubstreamsMode = iota // SubstreamsModeProduction is a SubstreamsMode of type Production. SubstreamsModeProduction )
func ParseSubstreamsMode ¶
func ParseSubstreamsMode(name string) (SubstreamsMode, error)
ParseSubstreamsMode attempts to convert a string to a SubstreamsMode
func (SubstreamsMode) MarshalText ¶
func (x SubstreamsMode) MarshalText() ([]byte, error)
MarshalText implements the text marshaller method
func (SubstreamsMode) String ¶
func (x SubstreamsMode) String() string
String implements the Stringer interface.
func (*SubstreamsMode) UnmarshalText ¶
func (x *SubstreamsMode) UnmarshalText(text []byte) error
UnmarshalText implements the text unmarshaller method