Documentation ¶
Overview ¶
Package events provides data structures and functions to stream and store events (logs) from the Ethereum blockchain.
Messages in the event stream have three possible actions:
Append a Block Rollback to a given Block (happens on chain reorganization) SetNext to a given block number.
Depending on the event filter used to retrieve logs, the stream may not contain logs for every block. The SetNext message allows the stream to signal that blocks have been read, but no events found. The Append and Rollback messages are straightforward.
The central interfaces are Streamer and EventLog (which is also a Streamer). A Streamer knows how to stream events. The ChainStreamer implements this to stream from the Ethereum blockchain by overlapping eth_getLogs calls, sending Rollback messages when a chain reorganization is detected. An EventLog implements both the receiving methods of a stream (Append, Rollback, SetNext), and the Streamer interface to emit the stored events.
Index ¶
- Constants
- func AddTransactionData(ctx context.Context, client *ethclient.Client, bs *BlockSlice) error
- func BigIntFromString(s string) (*big.Int, error)
- func BigIntToString(x *big.Int) string
- func BlockSliceToProto(bs *BlockSlice) *epb.BlockSlice
- func BlockToProto(b *Block) *epb.Block
- func EventToProto(e *Event) *epb.Event
- func FilterQueryFromProto(pb *epb.FilterQuery) (ethereum.FilterQuery, error)
- func FilterQueryToProto(q *ethereum.FilterQuery) *epb.FilterQuery
- func MatchBlocks(new, old *BlockSlice) (bool, uint64, error)
- type Action
- type Block
- type BlockSlice
- type CanceledError
- type ChainStreamer
- type Event
- type EventLog
- type InMemoryEventLog
- func (l *InMemoryEventLog) Append(b *Block) error
- func (l *InMemoryEventLog) Close() error
- func (l *InMemoryEventLog) Filter() ethereum.FilterQuery
- func (l *InMemoryEventLog) FirstBlock() uint64
- func (l *InMemoryEventLog) NextBlock() uint64
- func (l *InMemoryEventLog) Rollback(n uint64) error
- func (l *InMemoryEventLog) SetNext(n uint64) error
- func (l *InMemoryEventLog) Stream(done chan struct{}, from uint64) (*Subscription, error)
- func (l *InMemoryEventLog) ToProto() *epb.EventLogFile
- type LiveEventLog
- type Message
- type Streamer
- type Subscription
Constants ¶
const DefaultBatchOverlap uint64 = 10 // overlap between polls
const DefaultFetchBatchSize uint64 = 2000 // size of call to getLogs
const DefaultPollInterval int = 15 // seconds
const MaxEventlogSize uint64 = 1024 // blocks
Variables ¶
This section is empty.
Functions ¶
func AddTransactionData ¶ added in v0.2.0
func BigIntToString ¶
func BlockSliceToProto ¶
func BlockSliceToProto(bs *BlockSlice) *epb.BlockSlice
func BlockToProto ¶
message Block { uint64 number = 1; bytes hash = 2; repeated Event events = 3; }
func EventToProto ¶
EventToProto creates a proto representation of an Event.
func FilterQueryFromProto ¶
func FilterQueryFromProto(pb *epb.FilterQuery) (ethereum.FilterQuery, error)
func FilterQueryToProto ¶
func FilterQueryToProto(q *ethereum.FilterQuery) *epb.FilterQuery
func MatchBlocks ¶
func MatchBlocks(new, old *BlockSlice) (bool, uint64, error)
MatchHistory compares the new blocks with the old where they overlap. It returns true if all blocks in the overlap have the same hash. The second return value indicates the latest block that agrees. If no block agrees, it returns (false, 0, nil).
Types ¶
type BlockSlice ¶
func BlockSliceFromProto ¶
func BlockSliceFromProto(pb *epb.BlockSlice) (*BlockSlice, error)
func EmptyBlockSlice ¶
func EmptyBlockSlice(from uint64) *BlockSlice
func GetLogs ¶
func GetLogs(ctx context.Context, client *ethclient.Client, q *ethereum.FilterQuery) (*BlockSlice, error)
GetLogs returns a batch of logs matching a query. The blocks in the block are guaranteed to be sorted by increasing Number, and the events therein by Index.
func (*BlockSlice) Append ¶
func (b *BlockSlice) Append(blk *Block) error
func (*BlockSlice) Concat ¶
func (b *BlockSlice) Concat(other *BlockSlice) error
func (*BlockSlice) DeleteBeforeBlock ¶
func (b *BlockSlice) DeleteBeforeBlock(n uint64)
func (*BlockSlice) DeleteFromBlock ¶
func (b *BlockSlice) DeleteFromBlock(n uint64)
func (*BlockSlice) Extend ¶
func (b *BlockSlice) Extend(n uint64) error
func (*BlockSlice) Rollback ¶
func (b *BlockSlice) Rollback(n uint64) error
type CanceledError ¶
type CanceledError string
const Canceled CanceledError = CanceledError("")
func (CanceledError) Error ¶
func (CanceledError) Error() string
type ChainStreamer ¶
type ChainStreamer struct { Ctx context.Context Url string Filter ethereum.FilterQuery FetchBatchSize uint64 BatchOverlap uint64 FetchTxDetails bool }
ChainStreamer implements a Streamer for the Ethereum blockchain.
func (*ChainStreamer) Stream ¶
func (cr *ChainStreamer) Stream(done chan struct{}, from uint64) (*Subscription, error)
type Event ¶
type Event struct { Address common.Address Topics []common.Hash Data []byte BlockNumber uint64 BlockHash common.Hash Index uint64 // index of log in block TxHash common.Hash TxIndex uint64 // index of tx in block TxData []byte TxValue *big.Int TxFrom common.Address TxGas uint64 }
func EventFromProto ¶
EventFromProto creates an Event from its proto representation.
type EventLog ¶
type EventLog interface { Streamer Append(*Block) error Rollback(uint64) error SetNext(uint64) error FirstBlock() uint64 NextBlock() uint64 Filter() ethereum.FilterQuery Close() error }
EventLog represents a sequence of events matching a filter.
type InMemoryEventLog ¶
type InMemoryEventLog struct {
// contains filtered or unexported fields
}
InMemoryEventLog is an in-memory implementation of the EventLog interface.
func InMemoryEventLogFromProto ¶
func InMemoryEventLogFromProto(pb *epb.EventLogFile) (*InMemoryEventLog, error)
func NewInMemoryEventLog ¶
func NewInMemoryEventLog(from uint64, filter ethereum.FilterQuery) *InMemoryEventLog
func (*InMemoryEventLog) Append ¶
func (l *InMemoryEventLog) Append(b *Block) error
func (*InMemoryEventLog) Close ¶
func (l *InMemoryEventLog) Close() error
func (*InMemoryEventLog) Filter ¶
func (l *InMemoryEventLog) Filter() ethereum.FilterQuery
func (*InMemoryEventLog) FirstBlock ¶
func (l *InMemoryEventLog) FirstBlock() uint64
func (*InMemoryEventLog) NextBlock ¶
func (l *InMemoryEventLog) NextBlock() uint64
func (*InMemoryEventLog) Rollback ¶
func (l *InMemoryEventLog) Rollback(n uint64) error
func (*InMemoryEventLog) SetNext ¶
func (l *InMemoryEventLog) SetNext(n uint64) error
func (*InMemoryEventLog) Stream ¶
func (l *InMemoryEventLog) Stream(done chan struct{}, from uint64) (*Subscription, error)
func (*InMemoryEventLog) ToProto ¶
func (l *InMemoryEventLog) ToProto() *epb.EventLogFile
type LiveEventLog ¶
type LiveEventLog struct {
// contains filtered or unexported fields
}
LiveEventLog combines an EventLog and a ChainStreamer to make a new Streamer that streams first from the EventLog, and then from the ChainStreamer. When streaming from the ChainStreamer the messages are both sent to the EventLog and the subscriber.
func NewLiveEventLog ¶
func NewLiveEventLog(e EventLog, s ChainStreamer) *LiveEventLog
func (*LiveEventLog) Stream ¶
func (l *LiveEventLog) Stream(done chan struct{}, from uint64) (*Subscription, error)
type Streamer ¶
type Streamer interface {
Stream(done chan struct{}, from uint64) (*Subscription, error)
}