Documentation
¶
Index ¶
- Constants
- Variables
- func DoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func() error) error
- func MustDoForProtocol(kind pbbstream.Protocol, mappings map[pbbstream.Protocol]func())
- func ValidateRegistry() error
- type BasicBlockRef
- type BasicBlockRefFromID
- type Block
- func BlockFromBytes(bytes []byte) (*Block, error)
- func BlockFromProto(b *pbbstream.Block) (*Block, error)
- func MustBlockFromProto(b *pbbstream.Block) *Block
- func TestBlock(id, prev string) *Block
- func TestBlockFromJSON(jsonContent string) *Block
- func TestBlockWithLIBNum(id, previousID string, newLIB uint64) *Block
- func TestBlockWithTimestamp(id, prev string, timestamp time.Time) *Block
- func (b *Block) ID() string
- func (b *Block) Kind() pbbstream.Protocol
- func (b *Block) LIBNum() uint64
- func (b *Block) Num() uint64
- func (b *Block) Payload() []byte
- func (b *Block) PreviousID() string
- func (b *Block) String() string
- func (b *Block) Time() time.Time
- func (b *Block) ToNative() interface{}
- func (b *Block) ToProto() (*pbbstream.Block, error)
- func (b *Block) Version() int32
- type BlockDecoder
- type BlockDecoderFunc
- type BlockIDGate
- type BlockNumGate
- type BlockNumberGator
- type BlockReader
- type BlockReaderFactory
- type BlockReaderFactoryFunc
- type BlockRef
- type BlockRefFromID
- type BlockWriter
- type BlockWriterFactory
- type BlockWriterFactoryFunc
- type Buffer
- func (b *Buffer) AllBlocks() (out []BlockRef)
- func (b *Buffer) AppendHead(blk BlockRef)
- func (b *Buffer) Contains(blockNum uint64) bool
- func (b *Buffer) Delete(blk BlockRef)
- func (b *Buffer) Exists(id string) bool
- func (b *Buffer) GetByID(id string) (blk BlockRef)
- func (b *Buffer) Head() (blk BlockRef)
- func (b *Buffer) HeadBlocks(count int) []BlockRef
- func (b *Buffer) Len() int
- func (b *Buffer) PopTail() (blockRef BlockRef)
- func (b *Buffer) Tail() (blk BlockRef)
- func (b *Buffer) TruncateTail(lowBlockNumInclusive uint64) (truncated []BlockRef)
- type EternalSource
- type EternalSourceStartBackAtBlock
- type FileSource
- type FileSourceOption
- type GateType
- type Gator
- type Handler
- type HandlerFunc
- type JoiningSource
- type JoiningSourceOption
- func JoiningSourceMergerAddr(mergerAddr string) JoiningSourceOption
- func JoiningSourceName(name string) JoiningSourceOption
- func JoiningSourceRateLimit(rampLength int, sleepBetweenBlocks time.Duration) JoiningSourceOption
- func JoiningSourceTargetBlockID(id string) JoiningSourceOption
- func JoiningSourceTargetBlockNum(num uint64) JoiningSourceOption
- type MockSource
- type MultiplexedSource
- type Pipeline
- type PipelineFunc
- type PipelinePreprocessor
- type PipelineStateFlusher
- type PreprocessFunc
- type PreprocessedBlock
- type Preprocessor
- type Publisher
- type RealtimeGate
- type RealtimeTripper
- type RecentBlockGetter
- type ShortPipelineFunc
- type Shutterer
- type SimpleTailManager
- type Source
- type SourceFactory
- type SourceFromNumFactory
- type SourceFromNumFactoryWithErr
- type SourceFromRefFactory
- type StartBlockGetter
- type StartBlockResolver
- type StartBlockResolverFunc
- type Subscriber
- type TailLock
- type TestAfterProcessBlockFunc
- type TestBlockReader
- type TestBlockReaderBin
- type TestBlockWriterBin
- type TestPipeline
- type TestPipelineMiddleware
- type TestPublisher
- type TestSource
- type TestSourceFactory
- type TestSubscriber
- func (s *TestSubscriber) GetBlockIDInBuffer(blockNum uint64) string
- func (s *TestSubscriber) PushBlock(blk *Block)
- func (s *TestSubscriber) PushError(err error)
- func (s *TestSubscriber) Read() (*Block, error)
- func (s *TestSubscriber) Start(channelSize int)
- func (s *TestSubscriber) StartAtBlockID(ID string) bool
- func (s *TestSubscriber) Started() bool
- func (s *TestSubscriber) WaitFor(ID string) <-chan interface{}
- type TimeThresholdGator
Constants ¶
const ( GateInclusive = GateType(iota) GateExclusive )
Variables ¶
var BlockRefEmpty = NewBlockRef("", 0)
var GetProtocolFirstBlock = uint64(0)
var Metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("bstream"))
var TestBlockReaderFactory = BlockReaderFactoryFunc(testBlockReaderFactory)
var TestProtocol = pbbstream.Protocol(0xEADBEEF)
Hopefully, this block kind value will never be used!
Functions ¶
func DoForProtocol ¶
DoForProtocol extra the worker (a lambda) that will be invoked based on the received `kind` parameter. If the mapping exists, the worker is invoked and the error returned with the call. If the mapping does not exist, an error is returned. In all other cases, this function returns `nil`.
func MustDoForProtocol ¶
MustDoForProtocol perform the same work, but accept only non-error lambdas as the worker and an inexistant mapping will panic.
func ValidateRegistry ¶
func ValidateRegistry() error
Types ¶
type BasicBlockRef ¶
type BasicBlockRef struct {
// contains filtered or unexported fields
}
BasicBlockRef assumes the id and num are completely separated and represents two independent piece of information. The `ID()` in this case is the `id` field and the `Num()` is the `num` field.
func NewBlockRef ¶
func NewBlockRef(id string, num uint64) *BasicBlockRef
func (*BasicBlockRef) ID ¶
func (b *BasicBlockRef) ID() string
func (*BasicBlockRef) Num ¶
func (b *BasicBlockRef) Num() uint64
func (*BasicBlockRef) String ¶
func (b *BasicBlockRef) String() string
type BasicBlockRefFromID ¶
type BasicBlockRefFromID struct {
// contains filtered or unexported fields
}
BasicBlockRefFromID is a struct wrapper around `BlockRefFromID` but with the `num` field cached extracted from the `BlockRefFromID`. This implementation can be used for performanace critical part where you don't want to extract the block number over and over again and instead having it cached once.
func NewBlockRefFromID ¶
func NewBlockRefFromID(id BlockRefFromID) *BasicBlockRefFromID
func (*BasicBlockRefFromID) ID ¶
func (b *BasicBlockRefFromID) ID() string
func (*BasicBlockRefFromID) Num ¶
func (b *BasicBlockRefFromID) Num() uint64
func (*BasicBlockRefFromID) String ¶
func (b *BasicBlockRefFromID) String() string
type Block ¶
type Block struct { Id string Number uint64 PreviousId string Timestamp time.Time LibNum uint64 PayloadKind pbbstream.Protocol PayloadVersion int32 PayloadBuffer []byte // contains filtered or unexported fields }
Block reprensents a block abstraction across all dfuse systems and for now is wide enough to accomodate a varieties of implementation. It's the actual stucture that flows all around `bstream`.
func BlockFromBytes ¶
func MustBlockFromProto ¶
func TestBlockFromJSON ¶
func TestBlockWithLIBNum ¶
func TestBlockWithTimestamp ¶
func (*Block) PreviousID ¶
type BlockDecoder ¶
var GetBlockDecoder BlockDecoder
type BlockDecoderFunc ¶
func (BlockDecoderFunc) Decode ¶
func (f BlockDecoderFunc) Decode(blk *Block) (interface{}, error)
type BlockIDGate ¶
func NewBlockIDGate ¶
func NewBlockIDGate(blockID string, gateType GateType, h Handler) *BlockIDGate
func (*BlockIDGate) ProcessBlock ¶
func (g *BlockIDGate) ProcessBlock(blk *Block, obj interface{}) error
type BlockNumGate ¶
func NewBlockNumGate ¶
func NewBlockNumGate(blockNum uint64, gateType GateType, h Handler) *BlockNumGate
func (*BlockNumGate) ProcessBlock ¶
func (g *BlockNumGate) ProcessBlock(blk *Block, obj interface{}) error
type BlockNumberGator ¶
type BlockNumberGator struct {
// contains filtered or unexported fields
}
func NewBlockNumberGator ¶
func NewBlockNumberGator(blockNum uint64) *BlockNumberGator
func NewExclusiveBlockNumberGator ¶
func NewExclusiveBlockNumberGator(blockNum uint64) *BlockNumberGator
func (*BlockNumberGator) Pass ¶
func (g *BlockNumberGator) Pass(block *Block) bool
func (*BlockNumberGator) SetName ¶
func (g *BlockNumberGator) SetName(name string)
type BlockReader ¶
BlockReader is a reader protocol reading out bstream `Block` from a stream source. The reader respects the `io.Reader` contract in respect to `io.EOF`, i.e. it's possible to that both `block, io.EOF` be returned by the reader.
You shall treat a non-nil block regardless of the `err` as if present, it's guaranteed it's valid. The subsequent call will still return `nil, io.EOF`.
type BlockReaderFactory ¶
type BlockReaderFactory interface {
New(reader io.Reader) (BlockReader, error)
}
var GetBlockReaderFactory BlockReaderFactory
type BlockReaderFactoryFunc ¶
type BlockReaderFactoryFunc func(reader io.Reader) (BlockReader, error)
func (BlockReaderFactoryFunc) New ¶
func (f BlockReaderFactoryFunc) New(reader io.Reader) (BlockReader, error)
type BlockRef ¶
BlockRef represents a reference to a block and is mainly define as the pair `<BlockID, BlockNum>`. A `Block` interface should always implements the `BlockRef` interface.
The interface enforce also the creation of a `Stringer` object. We expected all format to be rendered in the form `#<BlockNum> (<Id>)`. This is to easy formatted output when using `zap.Stringer(...)`.
type BlockRefFromID ¶
type BlockRefFromID string
BlockRefFromID is a simple wrapper around a string assuming the block number is in the first 8 characters of the id as a big endian encoded hexadecimal number and the full string represents the ID.
func (BlockRefFromID) ID ¶
func (b BlockRefFromID) ID() string
func (BlockRefFromID) Num ¶
func (b BlockRefFromID) Num() uint64
func (BlockRefFromID) String ¶
func (b BlockRefFromID) String() string
type BlockWriter ¶
type BlockWriterFactory ¶
type BlockWriterFactory interface {
New(writer io.Writer) (BlockWriter, error)
}
var GetBlockWriterFactory BlockWriterFactory
type BlockWriterFactoryFunc ¶
type BlockWriterFactoryFunc func(writer io.Writer) (BlockWriter, error)
func (BlockWriterFactoryFunc) New ¶
func (f BlockWriterFactoryFunc) New(writer io.Writer) (BlockWriter, error)
type Buffer ¶
func (*Buffer) AppendHead ¶
func (*Buffer) HeadBlocks ¶
func (*Buffer) Len ¶
Len() locks the buffer and returns its length. Watch out for deadlocks between buffer.lock and promises.lock if using this internally.
func (*Buffer) TruncateTail ¶
type EternalSource ¶
func NewDelegatingEternalSource ¶
func NewDelegatingEternalSource(sf SourceFromRefFactory, startBackAt EternalSourceStartBackAtBlock, h Handler) *EternalSource
func NewEternalSource ¶
func NewEternalSource(sf SourceFromRefFactory, h Handler) *EternalSource
func (*EternalSource) Run ¶
func (s *EternalSource) Run()
type FileSource ¶
func NewFileSource ¶
func NewFileSource( blocksStore dstore.Store, startBlockNum uint64, parallelDownloads int, preprocFunc PreprocessFunc, h Handler, options ...FileSourceOption, ) *FileSource
NewFileSource will pipe potentially stream you 99 blocks before the given `startBlockNum`.
func (*FileSource) Run ¶
func (s *FileSource) Run()
func (*FileSource) SetLogger ¶
func (s *FileSource) SetLogger(logger *zap.Logger)
func (*FileSource) SetNotFoundCallback ¶
func (s *FileSource) SetNotFoundCallback(f func(missingBlockNum uint64))
SetNotFoundCallback sets a callback function to be triggered when a blocks file is not found. Useful for joining with unmerged blocks
type FileSourceOption ¶
type FileSourceOption = func(s *FileSource)
func FileSourceWithTimeThresholdGator ¶
func FileSourceWithTimeThresholdGator(threshold time.Duration) FileSourceOption
type HandlerFunc ¶
func (HandlerFunc) ProcessBlock ¶
func (h HandlerFunc) ProcessBlock(blk *Block, obj interface{}) error
type JoiningSource ¶
func NewJoiningSource ¶
func NewJoiningSource(fileSourceFactory, liveSourceFactory SourceFactory, h Handler, options ...JoiningSourceOption) *JoiningSource
func (*JoiningSource) Run ¶
func (s *JoiningSource) Run()
func (*JoiningSource) SetName ¶
func (s *JoiningSource) SetName(name string)
type JoiningSourceOption ¶
type JoiningSourceOption = func(s *JoiningSource)
func JoiningSourceMergerAddr ¶
func JoiningSourceMergerAddr(mergerAddr string) JoiningSourceOption
func JoiningSourceName ¶
func JoiningSourceName(name string) JoiningSourceOption
func JoiningSourceRateLimit ¶
func JoiningSourceRateLimit(rampLength int, sleepBetweenBlocks time.Duration) JoiningSourceOption
func JoiningSourceTargetBlockID ¶
func JoiningSourceTargetBlockID(id string) JoiningSourceOption
JoiningSourceTargetBlockID is an option for when we know right away the ID of the block where we want to start. In this case we'll accept that block coming from live stream right away (if we have not started processing blocks from file) it prevents waiting for a file to be merged when the live stream is serving us our startBlockID This is not recommended from a block number because we could have missed a "version" of that block number from the live source, but in a filesource, we always start from the first occurence of a blocknum.
func JoiningSourceTargetBlockNum ¶
func JoiningSourceTargetBlockNum(num uint64) JoiningSourceOption
JoiningSourceTargetBlockNum is like JoiningSourceTargetBlockID but allows starting immediately from block num == 2 on EOS or similar You better be DAMN SURE that you won't get a forked block at this number, beware
type MockSource ¶
func NewMockSource ¶
func NewMockSource(blocks []*Block, handler Handler) *MockSource
func (*MockSource) Run ¶
func (s *MockSource) Run()
type MultiplexedSource ¶
MultiplexedSource contains a gator based on realtime
func NewMultiplexedSource ¶
func NewMultiplexedSource(sourceFactories []SourceFactory, h Handler) *MultiplexedSource
func (*MultiplexedSource) Run ¶
func (s *MultiplexedSource) Run()
type Pipeline ¶
Pipeline will process all blocks through the `ProcessBlock()` function, unless the pipeline implements also `PipelinePreprocessor`, the `obj` will always nil.
If the pipeline was initialized with `irreversibleOnly`, only blk.Irreversible blocks will be processed (and post-processed). This is when used in conjunction with the irreversible index in parallel operations.
type PipelineFunc ¶
func (PipelineFunc) ProcessBlock ¶
func (f PipelineFunc) ProcessBlock(blk *Block, obj interface{}) error
type PipelinePreprocessor ¶
type PipelinePreprocessor interface { Pipeline PreprocessBlock(blk *Block) (obj interface{}, err error) }
ParallelPipeline pre-processes the `blk` in parallel. If the returned `obj` is non-nil, it is passed to the `ProcessBlock()` function. If it is nil, the `ProcessBlock` call is skipped for this block.
Even if `ParallelPreprocess` is called in parallel, the calls to `ProcessBlock()` are guaranteed to be run linearly in the order received from the logs. Note that some blocks can arrive out of order when forked. You should be certain of a linear order if the Afterburner is configured to feed only irreversible blocks.
type PipelineStateFlusher ¶
PipelineStateFlusher implements the FlushState() method, called when shutting down the Pipeliner
type PreprocessFunc ¶
type PreprocessedBlock ¶
type PreprocessedBlock struct { Block *Block Obj interface{} }
func (*PreprocessedBlock) ID ¶
func (p *PreprocessedBlock) ID() string
func (*PreprocessedBlock) Num ¶
func (p *PreprocessedBlock) Num() uint64
func (*PreprocessedBlock) String ¶
func (p *PreprocessedBlock) String() string
type Preprocessor ¶
type Preprocessor struct {
// contains filtered or unexported fields
}
func NewPreprocessor ¶
func NewPreprocessor(preprocFunc PreprocessFunc, next Handler) *Preprocessor
func (*Preprocessor) ProcessBlock ¶
func (p *Preprocessor) ProcessBlock(blk *Block, obj interface{}) (err error)
type RealtimeGate ¶
type RealtimeGate struct { Name string // contains filtered or unexported fields }
func NewRealtimeGate ¶
func NewRealtimeGate(timeToRealtime time.Duration, h Handler) *RealtimeGate
func (*RealtimeGate) ProcessBlock ¶
func (g *RealtimeGate) ProcessBlock(blk *Block, obj interface{}) error
func (*RealtimeGate) SetName ¶
func (g *RealtimeGate) SetName(name string)
type RealtimeTripper ¶
type RealtimeTripper struct {
// contains filtered or unexported fields
}
RealtimeTripper is a pass-through handler that executes a function before the first block goes through.
func NewRealtimeTripper ¶
func NewRealtimeTripper(timeToRealtime time.Duration, tripFunc func(), h Handler) *RealtimeTripper
func (*RealtimeTripper) ProcessBlock ¶
func (t *RealtimeTripper) ProcessBlock(blk *Block, obj interface{}) error
type RecentBlockGetter ¶
type RecentBlockGetter struct {
// contains filtered or unexported fields
}
RecentBlockGetter requires a source that shuts down when ProcessBlock fails
func NewRecentBlockGetter ¶
func NewRecentBlockGetter(sampleSize int) *RecentBlockGetter
func (*RecentBlockGetter) LatestBlock ¶
func (g *RecentBlockGetter) LatestBlock() *Block
func (*RecentBlockGetter) ProcessBlock ¶
func (g *RecentBlockGetter) ProcessBlock(blk *Block, obj interface{}) error
type ShortPipelineFunc ¶
func (ShortPipelineFunc) ProcessBlock ¶
func (f ShortPipelineFunc) ProcessBlock(blk *Block, obj interface{}) error
type SimpleTailManager ¶
func NewSimpleTailManager ¶
func NewSimpleTailManager(buffer *Buffer, bufferSize int) *SimpleTailManager
func (*SimpleTailManager) Launch ¶
func (m *SimpleTailManager) Launch()
func (*SimpleTailManager) TailLock ¶
func (m *SimpleTailManager) TailLock(blockNum uint64) (releaseFunc func(), err error)
type SourceFactory ¶
type SourceFromNumFactory ¶
type SourceFromRefFactory ¶
type StartBlockGetter ¶
type StartBlockGetter func() (blockNum uint64)
type StartBlockResolver ¶
type StartBlockResolver interface {
Resolve(ctx context.Context, targetBlockNum uint64) (startBlockNum uint64, previousIrreversibleID string, err error)
}
StartBlockResolver should give you a start block number that will guarantee covering all necessary blocks to handle forks before the block that you want. This requires chain-specific implementations.
A StartBlockResolver helps determine what is the lowest block that you have to fetch from your block source to ensure that you can handle forks for a given target start block
ex: I want to start at block 1000 and I may have to start at block 700 if I don't have knowledge of which block 1000 is "irreversible")
- the DumbStartBlockResolver may simply tell you to start at block 500 and be done with it.
- a StartBlockResolver based on more data could tell you that you can start at block 1000 but that you need to set the irreversible ID to "00001000deadbeef" in your `forkable` (InclusiveLIB) so that you don't start on a forked block that can't be resolved
- a StartBlockResolver based on a blocksource for EOSIO could fetch the "dposLIBNum" of your targetStartBlock, and tell you to start at that block (ex: 727)
type StartBlockResolverFunc ¶
func DumbStartBlockResolver ¶
func DumbStartBlockResolver(precedingBlocks uint64) StartBlockResolverFunc
DumbStartBlockResolver will help you start x blocks before your target start block
func ParallelStartResolver ¶
func ParallelStartResolver(resolvers []StartBlockResolver, attempts int) StartBlockResolverFunc
ParallelStartResolver will call multiple resolvers to get the fastest answer. It retries each resolver 'attempts' time before bailing out. If attempts<0, it will retry forever.
type Subscriber ¶
type Subscriber interface { Read() (*Block, error) StartAtBlockID(ID string) bool GetBlockIDInBuffer(blockNum uint64) string Start(channelSize int) Started() bool WaitFor(ID string) <-chan interface{} Shutterer }
Subscriber is a live blocks subscriber implementation
type TailLock ¶
TailLock manages inflight block queries, to feed into truncation mechanism, so it happens only when buffer is full, and no one is querying the blocks.
func NewTailLock ¶
func NewTailLock() *TailLock
func (*TailLock) LowerBound ¶
type TestBlockReader ¶
type TestBlockReader struct {
// contains filtered or unexported fields
}
func (*TestBlockReader) Read ¶
func (r *TestBlockReader) Read() (*Block, error)
type TestBlockReaderBin ¶
func (*TestBlockReaderBin) Read ¶
func (l *TestBlockReaderBin) Read() (*Block, error)
type TestBlockWriterBin ¶
func (*TestBlockWriterBin) Write ¶
func (w *TestBlockWriterBin) Write(block *Block) error
type TestPipeline ¶
type TestPipeline struct {
// contains filtered or unexported fields
}
TestPipeline is an instrumented Pipeline object.
func NewTestPipeline ¶
func NewTestPipeline() *TestPipeline
func (*TestPipeline) Error ¶
func (p *TestPipeline) Error(err error) (blk *Block, obj interface{}, readErr error)
Error consumes the next ProcessBlock and returns the provided error.
func (*TestPipeline) Next ¶
func (p *TestPipeline) Next() (blk *Block, obj interface{}, err error)
Next consumes the net block and provides a `nil` error.
func (*TestPipeline) ProcessBlock ¶
func (p *TestPipeline) ProcessBlock(blk *Block, obj interface{}) error
ProcessBlock implements the `Pipeline` interface.
type TestPipelineMiddleware ¶
type TestPipelineMiddleware struct { Pipeline // contains filtered or unexported fields }
TestPipelineMiddleware is a simplistic middleware with support only for ProcessBlock. It does *not* handle FlushState (yet), nor PreprocessBlock.
func NewTestPipelineMiddleware ¶
func NewTestPipelineMiddleware(child Pipeline, afterProcessBlock TestAfterProcessBlockFunc) *TestPipelineMiddleware
func (*TestPipelineMiddleware) ProcessBlock ¶
func (p *TestPipelineMiddleware) ProcessBlock(blk *Block, obj interface{}) error
type TestPublisher ¶
type TestPublisher struct {
Blocks []*Block
}
func NewTestPublisher ¶
func NewTestPublisher() *TestPublisher
func (TestPublisher) Listen ¶
func (TestPublisher) Listen() error
func (*TestPublisher) Publish ¶
func (p *TestPublisher) Publish(blk *Block) (relayed bool)
type TestSource ¶
type TestSource struct { *shutter.Shutter StartBlockID string StartBlockNum uint64 // contains filtered or unexported fields }
func NewTestSource ¶
func NewTestSource(h Handler) *TestSource
func (*TestSource) Push ¶
func (t *TestSource) Push(b *Block, obj interface{}) error
func (*TestSource) Run ¶
func (t *TestSource) Run()
type TestSourceFactory ¶
type TestSourceFactory struct {
Created chan *TestSource
}
func NewTestSourceFactory ¶
func NewTestSourceFactory() *TestSourceFactory
func (*TestSourceFactory) NewSource ¶
func (t *TestSourceFactory) NewSource(h Handler) Source
func (*TestSourceFactory) NewSourceFromNum ¶
func (t *TestSourceFactory) NewSourceFromNum(blockNum uint64, h Handler) Source
func (*TestSourceFactory) NewSourceFromRef ¶
func (t *TestSourceFactory) NewSourceFromRef(ref BlockRef, h Handler) Source
type TestSubscriber ¶
type TestSubscriber struct { *shutter.Shutter WeAreThereYet bool // contains filtered or unexported fields }
TestSubscriber instruments a Subscriber, implementing `Read()` and `Shutdown()`.
func NewTestSubscriber ¶
func NewTestSubscriber() *TestSubscriber
func (*TestSubscriber) GetBlockIDInBuffer ¶
func (s *TestSubscriber) GetBlockIDInBuffer(blockNum uint64) string
func (*TestSubscriber) PushBlock ¶
func (s *TestSubscriber) PushBlock(blk *Block)
func (*TestSubscriber) PushError ¶
func (s *TestSubscriber) PushError(err error)
func (*TestSubscriber) Read ¶
func (s *TestSubscriber) Read() (*Block, error)
func (*TestSubscriber) Start ¶
func (s *TestSubscriber) Start(channelSize int)
func (*TestSubscriber) StartAtBlockID ¶
func (s *TestSubscriber) StartAtBlockID(ID string) bool
func (*TestSubscriber) Started ¶
func (s *TestSubscriber) Started() bool
func (*TestSubscriber) WaitFor ¶
func (s *TestSubscriber) WaitFor(ID string) <-chan interface{}
type TimeThresholdGator ¶
type TimeThresholdGator struct {
// contains filtered or unexported fields
}
func NewTimeThresholdGator ¶
func NewTimeThresholdGator(threshold time.Duration) *TimeThresholdGator
func (*TimeThresholdGator) Pass ¶
func (g *TimeThresholdGator) Pass(block *Block) bool
func (*TimeThresholdGator) SetName ¶
func (g *TimeThresholdGator) SetName(name string)