Versions in this module Expand all Collapse all v0 v0.0.1 Jun 22, 2020 Changes in this version + const GateExclusive + const GateInclusive + var BlockRefEmpty = NewBlockRef("", 0) + var GetProtocolFirstBlock = uint64(0) + var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream")) + var TestBlockReaderFactory = BlockReaderFactoryFunc(testBlockReaderFactory) + var TestProtocol = pbbstream.Protocol(0xEADBEEF) + func DoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func() error) error + func MustDoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func()) + func ValidateRegistry() error + type BasicBlockRef struct + func NewBlockRef(id string, num uint64) *BasicBlockRef + func (b *BasicBlockRef) ID() string + func (b *BasicBlockRef) Num() uint64 + func (b *BasicBlockRef) String() string + type BasicBlockRefFromID struct + func NewBlockRefFromID(id BlockRefFromID) *BasicBlockRefFromID + func (b *BasicBlockRefFromID) ID() string + func (b *BasicBlockRefFromID) Num() uint64 + func (b *BasicBlockRefFromID) String() string + type Block struct + Id string + LibNum uint64 + Number uint64 + PayloadBuffer []byte + PayloadKind pbbstream.Protocol + PayloadVersion int32 + PreviousId string + Timestamp time.Time + func BlockFromBytes(bytes []byte) (*Block, error) + func BlockFromProto(b *pbbstream.Block) (*Block, error) + func MustBlockFromProto(b *pbbstream.Block) *Block + func TestBlock(id, prev string) *Block + func TestBlockFromJSON(jsonContent string) *Block + func TestBlockWithLIBNum(id, previousID string, newLIB uint64) *Block + func TestBlockWithTimestamp(id, prev string, timestamp time.Time) *Block + func (b *Block) ID() string + func (b *Block) Kind() pbbstream.Protocol + func (b *Block) LIBNum() uint64 + func (b *Block) Num() uint64 + func (b *Block) Payload() []byte + func (b *Block) PreviousID() string + func (b *Block) String() string + func (b *Block) Time() time.Time + func (b *Block) ToNative() interface{} + func (b *Block) ToProto() (*pbbstream.Block, error) + func (b *Block) Version() int32 + type BlockDecoder interface + Decode func(blk *Block) (interface{}, error) + var GetBlockDecoder BlockDecoder + type BlockDecoderFunc func(blk *Block) (interface{}, error) + func (f BlockDecoderFunc) Decode(blk *Block) (interface{}, error) + type BlockIDGate struct + MaxHoldOff int + Name string + func NewBlockIDGate(blockID string, gateType GateType, h Handler) *BlockIDGate + func (g *BlockIDGate) ProcessBlock(blk *Block, obj interface{}) error + type BlockNumGate struct + MaxHoldOff int + Name string + func NewBlockNumGate(blockNum uint64, gateType GateType, h Handler) *BlockNumGate + func (g *BlockNumGate) ProcessBlock(blk *Block, obj interface{}) error + type BlockNumberGator struct + func NewBlockNumberGator(blockNum uint64) *BlockNumberGator + func NewExclusiveBlockNumberGator(blockNum uint64) *BlockNumberGator + func (g *BlockNumberGator) Pass(block *Block) bool + func (g *BlockNumberGator) SetName(name string) + type BlockReader interface + Read func() (*Block, error) + type BlockReaderFactory interface + New func(reader io.Reader) (BlockReader, error) + var GetBlockReaderFactory BlockReaderFactory + type BlockReaderFactoryFunc func(reader io.Reader) (BlockReader, error) + func (f BlockReaderFactoryFunc) New(reader io.Reader) (BlockReader, error) + type BlockRef interface + ID func() string + Num func() uint64 + String func() string + type BlockRefFromID string + func (b BlockRefFromID) ID() string + func (b BlockRefFromID) Num() uint64 + func (b BlockRefFromID) String() string + type BlockWriter interface + Write func(block *Block) error + type BlockWriterFactory interface + New func(writer io.Writer) (BlockWriter, error) + var GetBlockWriterFactory BlockWriterFactory + type BlockWriterFactoryFunc func(writer io.Writer) (BlockWriter, error) + func (f BlockWriterFactoryFunc) New(writer io.Writer) (BlockWriter, error) + type Buffer struct + func NewBuffer(name string) *Buffer + func (b *Buffer) AllBlocks() (out []BlockRef) + func (b *Buffer) AppendHead(blk BlockRef) + func (b *Buffer) Contains(blockNum uint64) bool + func (b *Buffer) Delete(blk BlockRef) + func (b *Buffer) Exists(id string) bool + func (b *Buffer) GetByID(id string) (blk BlockRef) + func (b *Buffer) Head() (blk BlockRef) + func (b *Buffer) HeadBlocks(count int) []BlockRef + func (b *Buffer) Len() int + func (b *Buffer) PopTail() (blockRef BlockRef) + func (b *Buffer) Tail() (blk BlockRef) + func (b *Buffer) TruncateTail(lowBlockNumInclusive uint64) (truncated []BlockRef) + type EternalSource struct + func NewDelegatingEternalSource(sf SourceFromRefFactory, startBackAt EternalSourceStartBackAtBlock, h Handler) *EternalSource + func NewEternalSource(sf SourceFromRefFactory, h Handler) *EternalSource + func (s *EternalSource) Run() + type EternalSourceStartBackAtBlock func() (BlockRef, error) + type FileSource struct + Name string + func NewFileSource(blocksStore dstore.Store, startBlockNum uint64, parallelDownloads int, ...) *FileSource + func (s *FileSource) Run() + func (s *FileSource) SetLogger(logger *zap.Logger) + func (s *FileSource) SetNotFoundCallback(f func(missingBlockNum uint64)) + type FileSourceOption = func(s *FileSource) + func FileSourceWithTimeThresholdGator(threshold time.Duration) FileSourceOption + type GateType int + func (g GateType) String() string + type Gator interface + Pass func(block *Block) bool + type Handler interface + ProcessBlock func(blk *Block, obj interface{}) error + type HandlerFunc func(blk *Block, obj interface{}) error + func (h HandlerFunc) ProcessBlock(blk *Block, obj interface{}) error + type JoiningSource struct + func NewJoiningSource(fileSourceFactory, liveSourceFactory SourceFactory, h Handler, ...) *JoiningSource + func (s *JoiningSource) Run() + func (s *JoiningSource) SetName(name string) + type JoiningSourceOption = func(s *JoiningSource) + func JoiningSourceMergerAddr(mergerAddr string) JoiningSourceOption + func JoiningSourceName(name string) JoiningSourceOption + func JoiningSourceRateLimit(rampLength int, sleepBetweenBlocks time.Duration) JoiningSourceOption + func JoiningSourceTargetBlockID(id string) JoiningSourceOption + func JoiningSourceTargetBlockNum(num uint64) JoiningSourceOption + type MockSource struct + func NewMockSource(blocks []*Block, handler Handler) *MockSource + func (s *MockSource) Run() + type MultiplexedSource struct + func NewMultiplexedSource(sourceFactories []SourceFactory, h Handler) *MultiplexedSource + func (s *MultiplexedSource) Run() + type Pipeline interface + ProcessBlock func(blk *Block, obj interface{}) error + type PipelineFunc func(blk *Block, obj interface{}) error + func (f PipelineFunc) ProcessBlock(blk *Block, obj interface{}) error + type PipelinePreprocessor interface + PreprocessBlock func(blk *Block) (obj interface{}, err error) + type PipelineStateFlusher interface + FlushState func(exitError error) error + type PreprocessFunc func(blk *Block) (interface{}, error) + type PreprocessedBlock struct + Block *Block + Obj interface{} + func (p *PreprocessedBlock) ID() string + func (p *PreprocessedBlock) Num() uint64 + func (p *PreprocessedBlock) String() string + type Preprocessor struct + func NewPreprocessor(preprocFunc PreprocessFunc, next Handler) *Preprocessor + func (p *Preprocessor) ProcessBlock(blk *Block, obj interface{}) (err error) + type Publisher interface + Listen func() error + Publish func(*Block) (relayed bool) + type RealtimeGate struct + Name string + func NewRealtimeGate(timeToRealtime time.Duration, h Handler) *RealtimeGate + func (g *RealtimeGate) ProcessBlock(blk *Block, obj interface{}) error + func (g *RealtimeGate) SetName(name string) + type RealtimeTripper struct + func NewRealtimeTripper(timeToRealtime time.Duration, tripFunc func(), h Handler) *RealtimeTripper + func (t *RealtimeTripper) ProcessBlock(blk *Block, obj interface{}) error + type RecentBlockGetter struct + func NewRecentBlockGetter(sampleSize int) *RecentBlockGetter + func (g *RecentBlockGetter) LatestBlock() *Block + func (g *RecentBlockGetter) ProcessBlock(blk *Block, obj interface{}) error + type ShortPipelineFunc func(blk *Block) error + func (f ShortPipelineFunc) ProcessBlock(blk *Block, obj interface{}) error + type Shutterer interface + Err func() error + IsTerminated func() bool + IsTerminating func() bool + OnTerminated func(f func(error)) + OnTerminating func(f func(error)) + Shutdown func(error) + Terminated func() <-chan struct{} + Terminating func() <-chan struct{} + type SimpleTailManager struct + func NewSimpleTailManager(buffer *Buffer, bufferSize int) *SimpleTailManager + func (m *SimpleTailManager) Launch() + func (m *SimpleTailManager) TailLock(blockNum uint64) (releaseFunc func(), err error) + type Source interface + Run func() + type SourceFactory func(h Handler) Source + type SourceFromNumFactory func(startBlockNum uint64, h Handler) Source + type SourceFromNumFactoryWithErr func(startBlockNum uint64, h Handler) (Source, error) + type SourceFromRefFactory func(startBlockRef BlockRef, h Handler) Source + type StartBlockGetter func() (blockNum uint64) + type StartBlockResolver interface + Resolve func(ctx context.Context, targetBlockNum uint64) (startBlockNum uint64, previousIrreversibleID string, err error) + type StartBlockResolverFunc func(context.Context, uint64) (uint64, string, error) + func DumbStartBlockResolver(precedingBlocks uint64) StartBlockResolverFunc + func ParallelStartResolver(resolvers []StartBlockResolver, attempts int) StartBlockResolverFunc + func (s StartBlockResolverFunc) Resolve(ctx context.Context, targetBlockNum uint64) (uint64, string, error) + type Subscriber interface + GetBlockIDInBuffer func(blockNum uint64) string + Read func() (*Block, error) + Start func(channelSize int) + StartAtBlockID func(ID string) bool + Started func() bool + WaitFor func(ID string) <-chan interface{} + type TailLock struct + func NewTailLock() *TailLock + func (g *TailLock) LowerBound() uint64 + func (g *TailLock) TailLock(blockNum uint64) (releaseFunc func()) + type TestAfterProcessBlockFunc func(blk *Block, obj interface{}, result error) + type TestBlockReader struct + func (r *TestBlockReader) Read() (*Block, error) + type TestBlockReaderBin struct + DBinReader *dbin.Reader + func (l *TestBlockReaderBin) Read() (*Block, error) + type TestBlockWriterBin struct + DBinWriter *dbin.Writer + func (w *TestBlockWriterBin) Write(block *Block) error + type TestPipeline struct + func NewTestPipeline() *TestPipeline + func (p *TestPipeline) Error(err error) (blk *Block, obj interface{}, readErr error) + func (p *TestPipeline) Next() (blk *Block, obj interface{}, err error) + func (p *TestPipeline) ProcessBlock(blk *Block, obj interface{}) error + type TestPipelineMiddleware struct + func NewTestPipelineMiddleware(child Pipeline, afterProcessBlock TestAfterProcessBlockFunc) *TestPipelineMiddleware + func (p *TestPipelineMiddleware) ProcessBlock(blk *Block, obj interface{}) error + type TestPublisher struct + Blocks []*Block + func NewTestPublisher() *TestPublisher + func (TestPublisher) Listen() error + func (p *TestPublisher) Publish(blk *Block) (relayed bool) + type TestSource struct + StartBlockID string + StartBlockNum uint64 + func NewTestSource(h Handler) *TestSource + func (t *TestSource) Push(b *Block, obj interface{}) error + func (t *TestSource) Run() + type TestSourceFactory struct + Created chan *TestSource + func NewTestSourceFactory() *TestSourceFactory + func (t *TestSourceFactory) NewSource(h Handler) Source + func (t *TestSourceFactory) NewSourceFromNum(blockNum uint64, h Handler) Source + func (t *TestSourceFactory) NewSourceFromRef(ref BlockRef, h Handler) Source + type TestSubscriber struct + WeAreThereYet bool + func NewTestSubscriber() *TestSubscriber + func (s *TestSubscriber) GetBlockIDInBuffer(blockNum uint64) string + func (s *TestSubscriber) PushBlock(blk *Block) + func (s *TestSubscriber) PushError(err error) + func (s *TestSubscriber) Read() (*Block, error) + func (s *TestSubscriber) Start(channelSize int) + func (s *TestSubscriber) StartAtBlockID(ID string) bool + func (s *TestSubscriber) Started() bool + func (s *TestSubscriber) WaitFor(ID string) <-chan interface{} + type TimeThresholdGator struct + func NewTimeThresholdGator(threshold time.Duration) *TimeThresholdGator + func (g *TimeThresholdGator) Pass(block *Block) bool + func (g *TimeThresholdGator) SetName(name string)