Documentation ¶
Index ¶
- Constants
- Variables
- func AssertBlockRefEqual(t *testing.T, expected, actual BlockRef)
- func AssertCursorEqual(t *testing.T, expected, actual *Cursor)
- func AssertProtoEqual(t *testing.T, expected, actual proto.Message)
- func BlockFileName(block *pbbstream.Block) string
- func BlockFileNameWithSuffix(block *pbbstream.Block, suffix string) string
- func DecodeOneblockfileData(data []byte) (*pbbstream.Block, error)
- func DoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func() error) error
- func EqualsBlockRefs(left, right BlockRef) bool
- func FetchBlockFromMergedBlocksStore(ctx context.Context, num uint64, store dstore.Store) (*pbbstream.Block, error)
- func FetchBlockFromOneBlockStore(ctx context.Context, num uint64, id string, store dstore.Store) (*pbbstream.Block, error)
- func FetchBlockMetaByHashFromOneBlockStore(ctx context.Context, id string, store dstore.Store) (*pbbstream.BlockMeta, error)
- func FetchBlockMetaFromOneBlockStore(ctx context.Context, num uint64, id string, store dstore.Store) (*pbbstream.BlockMeta, error)
- func GetStreamHeadInfo(ctx context.Context, addr string) (head BlockRef, lib BlockRef, err error)
- func IsEmpty(ref BlockRef) bool
- func MustDoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func())
- func NewOneBlocksSource(lowestBlockNum uint64, store dstore.Store, handler Handler, ...) (*oneBlocksSource, error)
- func ParseFilename(filename string) (blockNum uint64, blockIDSuffix string, previousBlockIDSuffix string, ...)
- func TestBlock(id, prev string) *pbbstream.Block
- func TestBlockFromJSON(jsonContent string) *pbbstream.Block
- func TestBlockWithLIBNum(id, previousID string, newLIB uint64) *pbbstream.Block
- func TestBlockWithNumbers(id, prev string, num, prevNum uint64) *pbbstream.Block
- func TestBlockWithTimestamp(id, prev string, timestamp time.Time) *pbbstream.Block
- func TestJSONBlockWithLIBNum(id, previousID string, newLIB uint64) string
- func ToProtocol[B proto.Message](blk *pbbstream.Block) B
- func TruncateBlockID(in string) string
- func ValidateRegistry() error
- type BasicBlockRef
- type BlockDecoder
- type BlockDecoderFunc
- type BlockIDGate
- type BlockIndexProvider
- type BlockIndexProviderGetter
- type BlockNumGate
- type BlockNumberGator
- type BlockRef
- type BlockRefGetter
- func HighestBlockRefGetter(getters ...BlockRefGetter) BlockRefGetter
- func NetworkHeadBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
- func NetworkLIBBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
- func RetryableBlockRefGetter(attempts int, wait time.Duration, next BlockRefGetter) BlockRefGetter
- func StreamHeadBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
- func StreamLIBBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
- type BlockWithObj
- type Buffer
- func (b *Buffer) AllBlocks() (out []*pbbstream.Block)
- func (b *Buffer) AppendHead(blk *pbbstream.Block)
- func (b *Buffer) Contains(blockNum uint64) bool
- func (b *Buffer) Delete(blk *pbbstream.Block)
- func (b *Buffer) Exists(id string) bool
- func (b *Buffer) GetByID(id string) (blk *pbbstream.Block)
- func (b *Buffer) Head() (blk *pbbstream.Block)
- func (b *Buffer) HeadBlocks(count int) []*pbbstream.Block
- func (b *Buffer) Len() int
- func (b *Buffer) PopTail() (blockRef *pbbstream.Block)
- func (b *Buffer) Tail() (blk *pbbstream.Block)
- func (b *Buffer) TruncateTail(lowBlockNumInclusive uint64) (truncated []*pbbstream.Block)
- type Cursor
- type Cursorable
- type DBinBlockReader
- type DBinBlockWriter
- type EternalSource
- type EternalSourceOption
- type EternalSourceStartBackAtBlock
- type FileSource
- func NewFileSource(blocksStore dstore.Store, startBlockNum uint64, h Handler, logger *zap.Logger, ...) *FileSource
- func NewFileSourceFromCursor(mergedBlocksStore dstore.Store, forkedBlocksStore dstore.Store, cursor *Cursor, ...) *FileSource
- func NewFileSourceThroughCursor(mergedBlocksStore dstore.Store, forkedBlocksStore dstore.Store, ...) *FileSource
- type FileSourceFactory
- type FileSourceOption
- func FileSourceWithBlockIndexProvider(prov BlockIndexProvider) FileSourceOption
- func FileSourceWithBundleSize(bundleSize uint64) FileSourceOption
- func FileSourceWithConcurrentPreprocess(preprocFunc PreprocessFunc, threadCount int) FileSourceOption
- func FileSourceWithRetryDelay(delay time.Duration) FileSourceOption
- func FileSourceWithStopBlock(stopBlock uint64) FileSourceOption
- func FileSourceWithWhitelistedBlocks(nums ...uint64) FileSourceOption
- type ForkableObject
- type ForkableSourceFactory
- type Gate
- type GateOption
- type GateType
- type Gator
- type Handler
- type HandlerFunc
- type JoiningSource
- type JoiningSourceOption
- type LowSourceLimitGetter
- type MinimalBlockNumFilter
- type MockSource
- type MultiplexedSource
- type MultiplexedSourceOption
- type ObjectWrapper
- type OneBlockDownloaderFunc
- type OneBlockFile
- type OneBlocksSourceOption
- type ParsableTestBlock
- type PreprocessFunc
- type PreprocessedBlock
- type Preprocessor
- type Pretty
- type Range
- func MustParseRange(in string, opts ...RangeOptions) *Range
- func NewInclusiveRange(startBlock, endBlock uint64) *Range
- func NewOpenRange(startBlock uint64) *Range
- func NewRangeContaining(blockNum uint64, size uint64) (*Range, error)
- func NewRangeExcludingEnd(startBlock, endBlock uint64) *Range
- func ParseRange(in string, opts ...RangeOptions) (*Range, error)
- func (r *Range) Contains(blockNum uint64) bool
- func (r *Range) EndBlock() *uint64
- func (r *Range) Equals(other *Range) bool
- func (r *Range) IsNext(next *Range, size uint64) bool
- func (r *Range) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (r *Range) Next(size uint64) *Range
- func (r *Range) Previous(size uint64) *Range
- func (r *Range) ReachedEndBlock(blockNum uint64) bool
- func (r *Range) Size() (uint64, error)
- func (r *Range) Split(chunkSize uint64) ([]*Range, error)
- func (r *Range) StartBlock() uint64
- func (r *Range) String() string
- type RangeOptions
- type RealtimeGate
- type RealtimeTripper
- type RecentBlockGetter
- type Shutterer
- type Source
- type SourceFactory
- type SourceFromNumFactory
- type SourceFromNumFactoryWithSkipFunc
- type SourceFromRefFactory
- type StepType
- type Stepable
- type TailLock
- type TailLockOption
- type Target
- type TestBlockIndexProvider
- type TestBlockReader
- type TestBlockReaderBin
- type TestBlockWriterBin
- type TestSource
- type TestSourceFactory
- func (t *TestSourceFactory) LowestBlockNum() uint64
- func (t *TestSourceFactory) NewSource(h Handler) Source
- func (t *TestSourceFactory) NewSourceFromRef(ref BlockRef, h Handler) Source
- func (t *TestSourceFactory) SourceFromBlockNum(blockNum uint64, h Handler) Source
- func (t *TestSourceFactory) SourceFromCursor(cursor *Cursor, h Handler) Source
- func (t *TestSourceFactory) SourceThroughCursor(start uint64, cursor *Cursor, h Handler) Source
- type TimeThresholdGator
- type Tracker
- func (t *Tracker) AddGetter(target Target, f BlockRefGetter)
- func (t *Tracker) Clone() *Tracker
- func (t *Tracker) Get(ctx context.Context, target Target) (BlockRef, error)
- func (t *Tracker) GetRelativeBlock(ctx context.Context, potentiallyNegativeBlockNum int64, target Target) (uint64, error)
- func (t *Tracker) IsNear(ctx context.Context, from Target, to Target) (bool, error)
- func (t *Tracker) IsNearManualCheck(from, to uint64) bool
- func (t *Tracker) IsNearWithResults(ctx context.Context, from Target, to Target) (fromBlockRef BlockRef, toBlockRef BlockRef, isNear bool, err error)
- func (t *Tracker) SetNearBlocksCount(count int64)
Constants ¶
const ( GateInclusive = GateType(iota) GateExclusive )
const ( StepNew = StepType(1) // First time we're seeing this block StepUndo = StepType(2) // We are undoing this block (it was came as New previously) StepIrreversible = StepType(16) // This block is now final and cannot be 'Undone' anymore (irreversible) StepStalled = StepType(32) // This block passed the LIB and is definitely forked out StepNewIrreversible = StepType(StepNew | StepIrreversible) //5 First time we're seeing this block, but we already know that it is irreversible StepsAll = StepType(StepNew | StepUndo | StepIrreversible | StepStalled) //7 useful for filters )
const ( FileSourceHeadTarget = Target("filesource-head") LiveSourceHeadTarget = Target("livesource-head") LiveSourceTailTarget = Target("livesource-tail") NetworkHeadTarget = Target("network-head") NetworkLIBTarget = Target("network-lib") BlockStreamHeadTarget = Target("bstream-head") BlockStreamLIBTarget = Target("bstream-lib") HubHeadTarget = Target("hub-head") HubLIBTarget = Target("hub-lib") )
Variables ¶
var EmptyCursor = &Cursor{ Block: BlockRefEmpty, HeadBlock: BlockRefEmpty, LIB: BlockRefEmpty, }
var ErrGetterUndefined = errors.New("no getter defined")
var ErrOpenEndedRange = errors.New("open ended range")
var ErrResolveCursor = errors.New("cannot resolve cursor")
var ErrStopBlockReached = errors.New("stop block reached")
var ErrTrackerBlockNotFound = errors.New("tracker block not found")
var GetMaxNormalLIBDistance = uint64(1000)
var GetProtocolFirstStreamableBlock = uint64(0)
bstreams.NewDBinBlockReader var GetBlockReaderFactory BlockReaderFactory bstream.NewDBinBlockWriter var GetBlockWriterFactory BlockWriterFactory var GetBlockWriterHeaderLen int
var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream"))
var NormalizeBlockID = func(in string) string {
return in
}
Functions ¶
func AssertBlockRefEqual ¶
func AssertCursorEqual ¶
func BlockFileName ¶
func BlockFileNameWithSuffix ¶
func DoForProtocol ¶
DoForProtocol extra the worker (a lambda) that will be invoked based on the received `kind` parameter. If the mapping exists, the worker is invoked and the error returned with the call. If the mapping does not exist, an error is returned. In all other cases, this function returns `nil`.
func EqualsBlockRefs ¶
func FetchBlockMetaByHashFromOneBlockStore ¶
func FetchBlockMetaByHashFromOneBlockStore( ctx context.Context, id string, store dstore.Store, ) (*pbbstream.BlockMeta, error)
FetchBlockMetaByHashFromOneBlockStore fetches a block meta by its hash from a single block store. It will list all the blocks in the store and find the one that matches the hash. If the block is not found, it returns `nil, nil`.
func GetStreamHeadInfo ¶
func MustDoForProtocol ¶
MustDoForProtocol perform the same work, but accept only non-error lambdas as the worker and an inexistant mapping will panic.
func NewOneBlocksSource ¶
func ParseFilename ¶
func TestBlockFromJSON ¶
func TestBlockWithLIBNum ¶
func TestBlockWithNumbers ¶
func TestBlockWithTimestamp ¶
func TestJSONBlockWithLIBNum ¶
func TruncateBlockID ¶
func ValidateRegistry ¶
func ValidateRegistry() error
Types ¶
type BasicBlockRef ¶
type BasicBlockRef struct {
// contains filtered or unexported fields
}
BasicBlockRef assumes the id and num are completely separated and represents two independent piece of information. The `ID()` in this case is the `id` field and the `Num()` is the `num` field.
func NewBlockRef ¶
func NewBlockRef(id string, num uint64) BasicBlockRef
func NewBlockRefFromID ¶
func NewBlockRefFromID(id string) BasicBlockRef
NewBlockRefFromID is a convenience method when the string is assumed to have the block number in the first 8 characters of the id as a big endian encoded hexadecimal number and the full string represents the ID.
func (BasicBlockRef) ID ¶
func (b BasicBlockRef) ID() string
func (BasicBlockRef) Num ¶
func (b BasicBlockRef) Num() uint64
func (BasicBlockRef) String ¶
func (b BasicBlockRef) String() string
type BlockDecoder ¶
type BlockDecoderFunc ¶
type BlockIDGate ¶
type BlockIDGate struct { MaxHoldOff int // contains filtered or unexported fields }
func NewBlockIDGate ¶
func NewBlockIDGate(blockID string, gateType GateType, h Handler, opts ...GateOption) *BlockIDGate
func (*BlockIDGate) ProcessBlock ¶
func (g *BlockIDGate) ProcessBlock(blk *pbbstream.Block, obj interface{}) error
func (*BlockIDGate) SetLogger ¶
func (g *BlockIDGate) SetLogger(logger *zap.Logger)
type BlockIndexProvider ¶
type BlockIndexProviderGetter ¶
type BlockIndexProviderGetter interface {
GetIndexProvider() BlockIndexProvider
}
type BlockNumGate ¶
type BlockNumGate struct { MaxHoldOff int // contains filtered or unexported fields }
func NewBlockNumGate ¶
func NewBlockNumGate(blockNum uint64, gateType GateType, h Handler, opts ...GateOption) *BlockNumGate
func (*BlockNumGate) ProcessBlock ¶
func (g *BlockNumGate) ProcessBlock(blk *pbbstream.Block, obj interface{}) error
func (*BlockNumGate) SetLogger ¶
func (g *BlockNumGate) SetLogger(logger *zap.Logger)
type BlockNumberGator ¶
type BlockNumberGator struct {
// contains filtered or unexported fields
}
func NewBlockNumberGator ¶
func NewBlockNumberGator(blockNum uint64, opts ...GateOption) *BlockNumberGator
func NewExclusiveBlockNumberGator ¶
func NewExclusiveBlockNumberGator(blockNum uint64, opts ...GateOption) *BlockNumberGator
func (*BlockNumberGator) SetLogger ¶
func (g *BlockNumberGator) SetLogger(logger *zap.Logger)
type BlockRef ¶
BlockRef represents a reference to a block and is mainly define as the pair `<BlockID, BlockNum>`. A `Block` interface should always implement the `BlockRef` interface.
The interface enforce also the creation of a `Stringer` object. We expected all format to be rendered in the form `#<BlockNum> (<Id>)`. This is to easy formatted output when using `zap.Stringer(...)`.
var BlockRefEmpty BlockRef = &emptyBlockRef{}
type BlockRefGetter ¶
BlockRefGetter is a function to retrieve a block ref from any system.
func HighestBlockRefGetter ¶
func HighestBlockRefGetter(getters ...BlockRefGetter) BlockRefGetter
func NetworkHeadBlockRefGetter ¶
func NetworkHeadBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
func NetworkLIBBlockRefGetter ¶
func NetworkLIBBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
func RetryableBlockRefGetter ¶
func RetryableBlockRefGetter(attempts int, wait time.Duration, next BlockRefGetter) BlockRefGetter
func StreamHeadBlockRefGetter ¶
func StreamHeadBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
func StreamLIBBlockRefGetter ¶
func StreamLIBBlockRefGetter(headinfoServiceAddr string) BlockRefGetter
type BlockWithObj ¶
type Buffer ¶
func (*Buffer) AppendHead ¶
type Cursor ¶
type Cursor struct { Step StepType Block BlockRef LIB BlockRef // last block sent as irreversible if it exists, else known forkdb LIB // HeadBlock will be the same as Block when you receive a 'new' Step, except during a reorg. // During a reorg, (steps in ['new','redo','undo']) the HeadBlock will always point to the block that causes the reorg. // When the LIB is advancing (ex: DPOSLibNum changes, etc.), step='irreversible' and the HeadBlock will be the block // that causes previous blocks to become irreversible. HeadBlock BlockRef }
func CursorFromOpaque ¶
func FromString ¶
func (*Cursor) IsOnFinalBlock ¶
type Cursorable ¶
type Cursorable interface {
Cursor() *Cursor
}
type DBinBlockReader ¶
DBinBlockReader reads the dbin format where each element is assumed to be a `Block`.
func NewDBinBlockReader ¶
func NewDBinBlockReader(reader io.Reader) (out *DBinBlockReader, err error)
func (*DBinBlockReader) ReadAsBlockMeta ¶
func (l *DBinBlockReader) ReadAsBlockMeta() (*pbbstream.BlockMeta, error)
ReadAsBlockMeta reads the next message as a BlockMeta instead of as a Block leading to reduce memory constaint since the payload are "skipped". There is a memory pressure since we need to load the full block.
But at least it's not persisent memory.
type DBinBlockWriter ¶
type DBinBlockWriter struct {
// contains filtered or unexported fields
}
DBinBlockWriter reads the dbin format where each element is assumed to be a `Block`.
func NewDBinBlockWriter ¶
func NewDBinBlockWriter(writer io.Writer) (*DBinBlockWriter, error)
NewDBinBlockWriter creates a new DBinBlockWriter that writes to 'dbin' format, the 'contentType' must be 3 characters long perfectly, version should represent a version of the content.
type EternalSource ¶
func NewDelegatingEternalSource ¶
func NewDelegatingEternalSource(sf SourceFromRefFactory, startBackAt EternalSourceStartBackAtBlock, h Handler, opts ...EternalSourceOption) *EternalSource
func NewEternalSource ¶
func NewEternalSource(sf SourceFromRefFactory, h Handler, opts ...EternalSourceOption) *EternalSource
func (*EternalSource) Run ¶
func (s *EternalSource) Run()
func (*EternalSource) SetLogger ¶
func (s *EternalSource) SetLogger(logger *zap.Logger)
type EternalSourceOption ¶
type EternalSourceOption = func(s *EternalSource)
func EternalSourceWithLogger ¶
func EternalSourceWithLogger(logger *zap.Logger) EternalSourceOption
type FileSource ¶
func NewFileSource ¶
func NewFileSource( blocksStore dstore.Store, startBlockNum uint64, h Handler, logger *zap.Logger, options ...FileSourceOption, ) *FileSource
func NewFileSourceFromCursor ¶
func NewFileSourceFromCursor( mergedBlocksStore dstore.Store, forkedBlocksStore dstore.Store, cursor *Cursor, h Handler, logger *zap.Logger, options ...FileSourceOption, ) *FileSource
func (*FileSource) Run ¶
func (s *FileSource) Run()
func (*FileSource) SetLogger ¶
func (s *FileSource) SetLogger(logger *zap.Logger)
type FileSourceFactory ¶
type FileSourceFactory struct {
// contains filtered or unexported fields
}
func NewFileSourceFactory ¶
func NewFileSourceFactory( mergedBlocksStore dstore.Store, forkedBlocksStore dstore.Store, logger *zap.Logger, options ...FileSourceOption, ) *FileSourceFactory
func (*FileSourceFactory) SourceFromBlockNum ¶
func (g *FileSourceFactory) SourceFromBlockNum(start uint64, h Handler) Source
func (*FileSourceFactory) SourceFromCursor ¶
func (g *FileSourceFactory) SourceFromCursor(cursor *Cursor, h Handler) Source
func (*FileSourceFactory) SourceThroughCursor ¶
func (g *FileSourceFactory) SourceThroughCursor(start uint64, cursor *Cursor, h Handler) Source
type FileSourceOption ¶
type FileSourceOption = func(s *FileSource)
func FileSourceWithBlockIndexProvider ¶
func FileSourceWithBlockIndexProvider(prov BlockIndexProvider) FileSourceOption
func FileSourceWithBundleSize ¶
func FileSourceWithBundleSize(bundleSize uint64) FileSourceOption
func FileSourceWithConcurrentPreprocess ¶
func FileSourceWithConcurrentPreprocess(preprocFunc PreprocessFunc, threadCount int) FileSourceOption
func FileSourceWithRetryDelay ¶
func FileSourceWithRetryDelay(delay time.Duration) FileSourceOption
func FileSourceWithStopBlock ¶
func FileSourceWithStopBlock(stopBlock uint64) FileSourceOption
func FileSourceWithWhitelistedBlocks ¶
func FileSourceWithWhitelistedBlocks(nums ...uint64) FileSourceOption
type ForkableObject ¶
type ForkableObject interface { Cursorable Stepable ObjectWrapper }
type ForkableSourceFactory ¶
type ForkableSourceFactory interface { SourceFromBlockNum(uint64, Handler) Source // irreversible SourceFromCursor(*Cursor, Handler) Source SourceThroughCursor(uint64, *Cursor, Handler) Source }
ForkableSourceFactory allows you to get a stream of fork-aware blocks from either a cursor or a final block
type GateOption ¶
type GateOption func(g Gate)
func GateOptionWithLogger ¶
func GateOptionWithLogger(logger *zap.Logger) GateOption
type Handler ¶
func WithHeadMetrics ¶
func WithHeadMetrics(h Handler, blkNum *dmetrics.HeadBlockNum, blkDrift *dmetrics.HeadTimeDrift) Handler
type HandlerFunc ¶
func (HandlerFunc) ProcessBlock ¶
func (h HandlerFunc) ProcessBlock(blk *pbbstream.Block, obj interface{}) error
type JoiningSource ¶
JoiningSource joins an irreversible-only source (file) to a fork-aware source close to HEAD (live) 1) it tries to get the source from LiveSourceFactory (using startblock or cursor) 2) if it can't, it will ask the FileSourceFactory for a source of those blocks. 3) when it receives blocks from Filesource, it looks at LiveSource the JoiningSource will instantiate and run an 'initialSource' until it can bridge the gap
func NewJoiningSource ¶
func NewJoiningSource( fileSourceFactory, liveSourceFactory ForkableSourceFactory, h Handler, startBlockNum uint64, cursor *Cursor, cursorIsTarget bool, logger *zap.Logger, opts ...JoiningSourceOption) *JoiningSource
func (*JoiningSource) Run ¶
func (s *JoiningSource) Run()
type JoiningSourceOption ¶
type JoiningSourceOption func(s *JoiningSource)
func JoiningSourceWithFileSourceHandlerMiddleware ¶
func JoiningSourceWithFileSourceHandlerMiddleware(mw func(Handler) Handler) JoiningSourceOption
func JoiningSourceWithLiveSourceHandlerMiddleware ¶
func JoiningSourceWithLiveSourceHandlerMiddleware(mw func(Handler) Handler) JoiningSourceOption
type LowSourceLimitGetter ¶
type LowSourceLimitGetter interface {
LowestBlockNum() uint64
}
type MinimalBlockNumFilter ¶
type MinimalBlockNumFilter struct {
// contains filtered or unexported fields
}
MinimalBlockNumFilter does not let anything through that is under MinimalBlockNum
func NewMinimalBlockNumFilter ¶
func NewMinimalBlockNumFilter(blockNum uint64, h Handler) *MinimalBlockNumFilter
func (*MinimalBlockNumFilter) ProcessBlock ¶
func (f *MinimalBlockNumFilter) ProcessBlock(blk *pbbstream.Block, obj interface{}) error
type MockSource ¶
func NewMockSource ¶
func NewMockSource(blocks []*pbbstream.Block, handler Handler) *MockSource
func (*MockSource) Run ¶
func (s *MockSource) Run()
func (*MockSource) SetLogger ¶
func (s *MockSource) SetLogger(logger *zap.Logger)
type MultiplexedSource ¶
MultiplexedSource contains a gator based on realtime
func NewMultiplexedSource ¶
func NewMultiplexedSource(sourceFactories []SourceFactory, h Handler, opts ...MultiplexedSourceOption) *MultiplexedSource
func (*MultiplexedSource) Run ¶
func (s *MultiplexedSource) Run()
func (*MultiplexedSource) SetLogger ¶
func (s *MultiplexedSource) SetLogger(logger *zap.Logger)
type MultiplexedSourceOption ¶
type MultiplexedSourceOption = func(s *MultiplexedSource)
func MultiplexedSourceWithLogger ¶
func MultiplexedSourceWithLogger(logger *zap.Logger) MultiplexedSourceOption
type ObjectWrapper ¶
type ObjectWrapper interface {
WrappedObject() interface{}
}
type OneBlockDownloaderFunc ¶
type OneBlockDownloaderFunc = func(ctx context.Context, oneBlockFile *OneBlockFile) (data []byte, err error)
func OneBlockDownloaderFromStore ¶
func OneBlockDownloaderFromStore(blocksStore dstore.Store) OneBlockDownloaderFunc
type OneBlockFile ¶
type OneBlockFile struct { sync.Mutex CanonicalName string Filenames map[string]bool ID string Num uint64 LibNum uint64 PreviousID string MemoizeData []byte Deleted bool }
OneBlockFile is the representation of a single block inside one or more duplicate files, before they are merged It has a truncated ID
func MustNewOneBlockFile ¶
func MustNewOneBlockFile(fileName string) *OneBlockFile
func NewOneBlockFile ¶
func NewOneBlockFile(fileName string) (*OneBlockFile, error)
func (*OneBlockFile) Data ¶
func (f *OneBlockFile) Data(ctx context.Context, oneBlockDownloader OneBlockDownloaderFunc) ([]byte, error)
func (*OneBlockFile) String ¶
func (f *OneBlockFile) String() string
func (*OneBlockFile) ToBstreamBlock ¶
func (f *OneBlockFile) ToBstreamBlock() *pbbstream.Block
type OneBlocksSourceOption ¶
type OneBlocksSourceOption func(*oneBlocksSource)
func OneBlocksSourceLogger ¶
func OneBlocksSourceLogger(logger *zap.Logger) OneBlocksSourceOption
OneBlocksSourceLogger configures the logger to use on the one block source
func OneBlocksSourceWithSkipperFunc ¶
func OneBlocksSourceWithSkipperFunc(f func(string) bool) OneBlocksSourceOption
OneBlocksSourceWithSkipperFunc allows a lookup function to prevent downloading the same file over and over
type ParsableTestBlock ¶
type ParsableTestBlock struct { ID string `json:"id,omitempty"` ParentID string `json:"prev,omitempty"` ParentNum uint64 `json:"prevnum,omitempty"` Number uint64 `json:"num,omitempty"` LIBNum uint64 `json:"libnum,omitempty"` Timestamp string `json:"time,omitempty"` Kind int32 `json:"kind,omitempty"` Version int32 `json:"version,omitempty"` }
type PreprocessFunc ¶
type PreprocessedBlock ¶
func (*PreprocessedBlock) ID ¶
func (p *PreprocessedBlock) ID() string
func (*PreprocessedBlock) Num ¶
func (p *PreprocessedBlock) Num() uint64
func (*PreprocessedBlock) String ¶
func (p *PreprocessedBlock) String() string
type Preprocessor ¶
type Preprocessor struct {
// contains filtered or unexported fields
}
Preprocessor will run a preprocess func only if `obj` is empty or if it matches a ForkableObject where the WrappedObject() is nil
func NewPreprocessor ¶
func NewPreprocessor(preprocFunc PreprocessFunc, next Handler) *Preprocessor
func (*Preprocessor) ProcessBlock ¶
func (p *Preprocessor) ProcessBlock(blk *pbbstream.Block, obj interface{}) (err error)
type Range ¶
type Range struct {
// contains filtered or unexported fields
}
func MustParseRange ¶
func MustParseRange(in string, opts ...RangeOptions) *Range
func NewInclusiveRange ¶
func NewOpenRange ¶
func NewRangeExcludingEnd ¶
func ParseRange ¶
func ParseRange(in string, opts ...RangeOptions) (*Range, error)
ParseRange will parse a range of format 5-10, by default it will make an inclusive start & end use options to set exclusive boundaries
func (*Range) MarshalLogObject ¶
func (r *Range) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Range) ReachedEndBlock ¶
func (*Range) StartBlock ¶
type RangeOptions ¶
func WithExclusiveEnd ¶
func WithExclusiveEnd() RangeOptions
func WithExclusiveStart ¶
func WithExclusiveStart() RangeOptions
type RealtimeGate ¶
type RealtimeGate struct {
// contains filtered or unexported fields
}
func NewRealtimeGate ¶
func NewRealtimeGate(timeToRealtime time.Duration, h Handler, opts ...GateOption) *RealtimeGate
func (*RealtimeGate) ProcessBlock ¶
func (g *RealtimeGate) ProcessBlock(blk *pbbstream.Block, obj interface{}) error
func (*RealtimeGate) SetLogger ¶
func (g *RealtimeGate) SetLogger(logger *zap.Logger)
type RealtimeTripper ¶
type RealtimeTripper struct {
// contains filtered or unexported fields
}
RealtimeTripper is a pass-through handler that executes a function before the first block goes through.
func NewRealtimeTripper ¶
func NewRealtimeTripper(timeToRealtime time.Duration, tripFunc func(), h Handler, opts ...GateOption) *RealtimeTripper
func (*RealtimeTripper) ProcessBlock ¶
func (t *RealtimeTripper) ProcessBlock(blk *pbbstream.Block, obj interface{}) error
func (*RealtimeTripper) SetLogger ¶
func (t *RealtimeTripper) SetLogger(logger *zap.Logger)
type RecentBlockGetter ¶
type RecentBlockGetter struct {
// contains filtered or unexported fields
}
RecentBlockGetter requires a source that shuts down when ProcessBlock fails
func NewRecentBlockGetter ¶
func NewRecentBlockGetter(sampleSize int) *RecentBlockGetter
func (*RecentBlockGetter) LatestBlock ¶
func (g *RecentBlockGetter) LatestBlock() *pbbstream.Block
func (*RecentBlockGetter) ProcessBlock ¶
func (g *RecentBlockGetter) ProcessBlock(blk *pbbstream.Block, obj interface{}) error
type SourceFactory ¶
type SourceFromNumFactory ¶
type SourceFromRefFactory ¶
type TailLock ¶
TailLock manages inflight block queries, to feed into truncation mechanism, so it happens only when buffer is full, and no one is querying the blocks.
func NewTailLock ¶
func NewTailLock(opts ...TailLockOption) *TailLock
func (*TailLock) LowerBound ¶
type TailLockOption ¶
type TailLockOption = func(s *TailLock)
func TailLockWithLogger ¶
func TailLockWithLogger(logger *zap.Logger) TailLockOption
type TestBlockIndexProvider ¶
func (*TestBlockIndexProvider) BlocksInRange ¶
func (t *TestBlockIndexProvider) BlocksInRange(lowBlockNum uint64, bundleSize uint64) (out []uint64, err error)
type TestBlockReader ¶
type TestBlockReader struct {
// contains filtered or unexported fields
}
type TestBlockReaderBin ¶
type TestBlockWriterBin ¶
type TestSource ¶
type TestSource struct { *shutter.Shutter StartBlockID string StartBlockNum uint64 Cursor *Cursor PassThroughCursor bool // contains filtered or unexported fields }
func NewTestSource ¶
func NewTestSource(h Handler) *TestSource
func (*TestSource) Run ¶
func (t *TestSource) Run()
func (*TestSource) SetLogger ¶
func (t *TestSource) SetLogger(logger *zap.Logger)
type TestSourceFactory ¶
type TestSourceFactory struct { Created chan *TestSource FromBlockNumFunc func(uint64, Handler) Source FromCursorFunc func(*Cursor, Handler) Source ThroughCursorFunc func(uint64, *Cursor, Handler) Source LowestBlkNum uint64 }
func NewTestSourceFactory ¶
func NewTestSourceFactory() *TestSourceFactory
func (*TestSourceFactory) LowestBlockNum ¶
func (t *TestSourceFactory) LowestBlockNum() uint64
func (*TestSourceFactory) NewSource ¶
func (t *TestSourceFactory) NewSource(h Handler) Source
func (*TestSourceFactory) NewSourceFromRef ¶
func (t *TestSourceFactory) NewSourceFromRef(ref BlockRef, h Handler) Source
func (*TestSourceFactory) SourceFromBlockNum ¶
func (t *TestSourceFactory) SourceFromBlockNum(blockNum uint64, h Handler) Source
func (*TestSourceFactory) SourceFromCursor ¶
func (t *TestSourceFactory) SourceFromCursor(cursor *Cursor, h Handler) Source
func (*TestSourceFactory) SourceThroughCursor ¶
func (t *TestSourceFactory) SourceThroughCursor(start uint64, cursor *Cursor, h Handler) Source
type TimeThresholdGator ¶
type TimeThresholdGator struct {
// contains filtered or unexported fields
}
func NewTimeThresholdGator ¶
func NewTimeThresholdGator(threshold time.Duration, opts ...GateOption) *TimeThresholdGator
func (*TimeThresholdGator) SetLogger ¶
func (g *TimeThresholdGator) SetLogger(logger *zap.Logger)
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker tracks the chain progress and block history. Allows many processes to take decisions on the state of different pieces being sync'd (live) or in catch-up mode.
func NewTracker ¶
func (*Tracker) AddGetter ¶
func (t *Tracker) AddGetter(target Target, f BlockRefGetter)
func (*Tracker) GetRelativeBlock ¶
func (*Tracker) IsNearManualCheck ¶
IsNearManualCheck allows you to manually check two "already resolved values" for nearness.
func (*Tracker) IsNearWithResults ¶
func (t *Tracker) IsNearWithResults(ctx context.Context, from Target, to Target) (fromBlockRef BlockRef, toBlockRef BlockRef, isNear bool, err error)
IsNearWithResults returns BlockRefs for the two targets. It can short-circuit the lookup for `from` if `to` is near the beginning of the chain, within the `nearBlocksCount`, in which case `fromBlockRef` will be nil.
func (*Tracker) SetNearBlocksCount ¶
Source Files ¶
- block.go
- blocktypes.go
- buffer.go
- cursor.go
- cursor_resolver.go
- decoder.go
- eternalsource.go
- filesource.go
- gates.go
- gator.go
- headinfo.go
- interfaces.go
- joiningsource.go
- logging.go
- metrics.go
- mock_source.go
- multiplexedsource.go
- oneblock_source.go
- oneblockfile.go
- preprocess.go
- range.go
- reader.go
- recentblockgetter.go
- registry.go
- single_block_fetcher.go
- steps.go
- taillock.go
- testing.go
- tracker.go
- types.go
- util.go
- writer.go