Documentation ¶
Index ¶
- Variables
- func AvroInt(b int) []byte
- func Ready()
- type Batch
- type BatchRecord
- type ChainUpdate
- type Message
- type MessageProcessor
- func (mp *MessageProcessor) ProcessCompleteBatch(pb *PendingBatch)
- func (mp *MessageProcessor) ProcessMessage(m ResumptionMessage) error
- func (mp *MessageProcessor) Subscribe(ch chan<- *PendingBatch) types.Subscription
- func (mp *MessageProcessor) SubscribeReorgs(ch chan<- map[int64]types.Hash) types.Subscription
- func (mp *MessageProcessor) SubscribeWaiters(ch chan<- *Waiter) types.Subscription
- func (mp *MessageProcessor) WhyNotReady(hash types.Hash) string
- type MessageType
- type OrderedMessageProcessor
- func (omp *OrderedMessageProcessor) Close()
- func (omp *OrderedMessageProcessor) HandlePendingBatch(pb *PendingBatch, reorg bool)
- func (omp *OrderedMessageProcessor) HandleReorg(num int64, hash types.Hash)
- func (omp *OrderedMessageProcessor) ProcessCompleteBatch(pb *PendingBatch)
- func (omp *OrderedMessageProcessor) ProcessMessage(m ResumptionMessage) error
- func (omp *OrderedMessageProcessor) Subscribe(ch interface{}) types.Subscription
- func (omp *OrderedMessageProcessor) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
- func (omp *OrderedMessageProcessor) WhyNotReady(hash types.Hash) string
- type PendingBatch
- type Producer
- func (p *Producer) AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, ...) (map[string][]Message, error)
- func (p *Producer) Reorg(number int64, hash types.Hash) Message
- func (p *Producer) ReorgDone(number int64, hash types.Hash) Message
- func (p *Producer) SendBatch(batchid types.Hash, deletes []string, update map[string][]byte) (map[string][]Message, error)
- type ResumptionMessage
- type SubBatchRecord
- type Waiter
Constants ¶
This section is empty.
Variables ¶
var ( ErrPrefixMissing = fmt.Errorf("prefix missing") ErrPrefixOverlap = fmt.Errorf("producers must not have overlapping prefixes") ErrUnknownBatch = fmt.Errorf("unknown batch") ErrInvalidBatch = fmt.Errorf("invalid batch") ErrPrefixConflict = fmt.Errorf("deletes, updates, and batches cannot have prefix conflicts") )
Functions ¶
Types ¶
type Batch ¶
type Batch struct { Number int64 `avro:"num"` Weight []byte `avro:"weight"` ParentHash types.Hash `avro:"parent"` Updates map[string]BatchRecord `avro:"updates"` Hash types.Hash }
func UnmarshalBatch ¶
func (*Batch) EncodeAvro ¶
type BatchRecord ¶
type ChainUpdate ¶
type ChainUpdate struct {
// contains filtered or unexported fields
}
func (*ChainUpdate) Added ¶
func (c *ChainUpdate) Added() []*PendingBatch
func (*ChainUpdate) Done ¶ added in v1.5.0
func (c *ChainUpdate) Done()
func (*ChainUpdate) Removed ¶
func (c *ChainUpdate) Removed() []*PendingBatch
type MessageProcessor ¶
type MessageProcessor struct {
// contains filtered or unexported fields
}
MessageProcessor aggregates messages and emits completed blocks. Messages will be discarded if their blocks are older than lastEmittedNum - reorgThreshold, but otherwise the will be emitted in the order they are completed, even if they are older than the initial block or there are gaps between the last emitted block and this one. Subsequent layers will be used to ensure blocks are emitted in something resembling a sensible order and that reorgs are handled.
func NewMessageProcessor ¶
func NewMessageProcessor(lastEmittedNum int64, reorgThreshold int64, trackedPrefixes []*regexp.Regexp) *MessageProcessor
NewMessageProcessor instantiates a MessageProcessor. lastEmittedNum should indicate the last block number processed by this system for resumption. Blocks older than lastEmittedNum - reorgThreshold will be discarded. trackedPrefixes is a list of regular expressions indicating which messages are of interest to this consumer - use '.*' to get all messages.
func (*MessageProcessor) ProcessCompleteBatch ¶ added in v0.0.38
func (mp *MessageProcessor) ProcessCompleteBatch(pb *PendingBatch)
func (*MessageProcessor) ProcessMessage ¶
func (mp *MessageProcessor) ProcessMessage(m ResumptionMessage) error
ProcessMessage takes in messages and assembles completed blocks of messages.
func (*MessageProcessor) Subscribe ¶
func (mp *MessageProcessor) Subscribe(ch chan<- *PendingBatch) types.Subscription
Subscribe to pending batches.
func (*MessageProcessor) SubscribeReorgs ¶
func (mp *MessageProcessor) SubscribeReorgs(ch chan<- map[int64]types.Hash) types.Subscription
Subscribe to reorg notifications
func (*MessageProcessor) SubscribeWaiters ¶ added in v1.5.0
func (mp *MessageProcessor) SubscribeWaiters(ch chan<- *Waiter) types.Subscription
Subscribe to the hashes of pending batches
func (*MessageProcessor) WhyNotReady ¶
func (mp *MessageProcessor) WhyNotReady(hash types.Hash) string
type MessageType ¶
type MessageType byte
const ( BatchType MessageType = iota // BatchType.$Hash SubBatchHeaderType // SubBatchHeaderType.$Hash.$BatchId SubBatchMsgType // SubBatchMsgType.$hash.$BatchId.$Index BatchMsgType // BatchMsgType.$hash./path/ BatchDeleteMsgType // BatchDeleteMsgType.$hash./path/ ReorgType // ReorgType.$Hash ReorgCompleteType // ReorgCompleteType.$Hash PingType MessageType = 0xff )
func (MessageType) GetKey ¶
func (mt MessageType) GetKey(components ...[]byte) []byte
type OrderedMessageProcessor ¶
type OrderedMessageProcessor struct {
// contains filtered or unexported fields
}
func (*OrderedMessageProcessor) Close ¶
func (omp *OrderedMessageProcessor) Close()
func (*OrderedMessageProcessor) HandlePendingBatch ¶
func (omp *OrderedMessageProcessor) HandlePendingBatch(pb *PendingBatch, reorg bool)
func (*OrderedMessageProcessor) HandleReorg ¶
func (omp *OrderedMessageProcessor) HandleReorg(num int64, hash types.Hash)
func (*OrderedMessageProcessor) ProcessCompleteBatch ¶ added in v0.0.38
func (omp *OrderedMessageProcessor) ProcessCompleteBatch(pb *PendingBatch)
func (*OrderedMessageProcessor) ProcessMessage ¶
func (omp *OrderedMessageProcessor) ProcessMessage(m ResumptionMessage) error
func (*OrderedMessageProcessor) Subscribe ¶
func (omp *OrderedMessageProcessor) Subscribe(ch interface{}) types.Subscription
Subscribe enables subscribing to either oredred chain updates or unordered pending batches. Calling Subscribe on a chan *ChainUpdate will return a subscription for ordered chain updates. Calling subscribe on a *PendingBatch will return a subscription for unordered pending batches.
func (*OrderedMessageProcessor) SubscribeReorg ¶
func (omp *OrderedMessageProcessor) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
func (*OrderedMessageProcessor) WhyNotReady ¶
func (omp *OrderedMessageProcessor) WhyNotReady(hash types.Hash) string
type PendingBatch ¶
type PendingBatch struct { Number int64 Weight *big.Int ParentHash types.Hash Hash types.Hash Values map[string][]byte Deletes map[string]struct{} // contains filtered or unexported fields }
func (*PendingBatch) ApplyMessage ¶
func (pb *PendingBatch) ApplyMessage(m ResumptionMessage) bool
func (*PendingBatch) DecPrefixCounter ¶
func (pb *PendingBatch) DecPrefixCounter(p string) bool
func (*PendingBatch) Done ¶
func (pb *PendingBatch) Done()
Done should be called on a pending batch after it has been applied.
func (*PendingBatch) Ready ¶
func (pb *PendingBatch) Ready() bool
func (*PendingBatch) Resumption ¶
func (pb *PendingBatch) Resumption() string
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func (*Producer) Reorg ¶
Reorg should be called for large reorgs (> reorgThreshold). The number and hash should correspond to the common ancestor between the two reorged chains.
type ResumptionMessage ¶
type SubBatchRecord ¶
type SubBatchRecord struct { Delete []string `avro:"delete"` Updates map[string][]byte `avro:"updates"` }
func (*SubBatchRecord) EncodeAvro ¶
func (b *SubBatchRecord) EncodeAvro() []byte