log

package
v1.3.0-rc5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2022 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBroadcaster

func NewBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr logger.Logger, highestSavedHead *evmtypes.Head) *broadcaster

NewBroadcaster creates a new instance of the broadcaster

func NewORM

func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig, evmChainID big.Int) *orm

Types

type Broadcast

type Broadcast interface {
	DecodedLog() interface{}
	RawLog() types.Log
	String() string
	LatestBlockNumber() uint64
	LatestBlockHash() common.Hash
	JobID() int32
	EVMChainID() big.Int
}

The Broadcast type wraps a types.Log but provides additional functionality for determining whether or not the log has been consumed and for marking the log as consumed

func NewLogBroadcast

func NewLogBroadcast(rawLog types.Log, evmChainID big.Int, decodedLog interface{}) Broadcast

type Broadcaster

type Broadcaster interface {
	utils.DependentAwaiter
	services.ServiceCtx
	httypes.HeadTrackable

	// ReplayFromBlock enqueues a replay from the provided block number. If forceBroadcast is
	// set to true, the broadcaster will broadcast logs that were already marked consumed
	// previously by any subscribers.
	ReplayFromBlock(number int64, forceBroadcast bool)

	IsConnected() bool
	Register(listener Listener, opts ListenerOpts) (unsubscribe func())

	WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error)
	MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error
}

The Broadcaster manages log subscription requests for the Chainlink node. Instead of creating a new subscription for each request, it multiplexes all subscriptions to all of the relevant contracts over a single connection and forwards the logs to the relevant subscribers.

In case of node crash and/or restart, the logs will be backfilled for subscribers that are added before all dependents of LogBroadcaster are done.

The backfill starts from the earliest block of either:

  • Latest DB head minus BlockBackfillDepth and the maximum number of confirmations.
  • Earliest pending or unconsumed log broadcast from DB.

If a subscriber is added after the LogBroadcaster does the initial backfill, then it's possible/likely that the backfill fill only have depth: 1 (from latest head)

Of course, these backfilled logs + any new logs will only be sent after the NumConfirmations for given subscriber.

type BroadcasterInTest

type BroadcasterInTest interface {
	Broadcaster
	BackfillBlockNumber() null.Int64
	TrackedAddressesCount() uint32
	// Pause pauses the eventLoop until Resume is called.
	Pause()
	// Resume resumes the eventLoop after calling Pause.
	Resume()
	LogsFromBlock(bh common.Hash) int
}

type Config

type Config interface {
	BlockBackfillDepth() uint64
	BlockBackfillSkip() bool
	EvmFinalityDepth() uint32
	EvmLogBackfillBatchSize() uint32
}

type Listener

type Listener interface {
	HandleLog(b Broadcast)
	JobID() int32
}

The Listener responds to log events through HandleLog.

type ListenerOpts

type ListenerOpts struct {
	Contract common.Address

	// Event types to receive, with value filter for each field in the event
	// No filter or an empty filter for a given field position mean: all values allowed
	// the key should be a result of AbigenLog.Topic() call
	// topic => topicValueFilters
	LogsWithTopics map[common.Hash][][]Topic

	ParseLog ParseLogFunc

	// Minimum number of block confirmations before the log is received
	MinIncomingConfirmations uint32
}

type LogBroadcast

type LogBroadcast struct {
	BlockHash common.Hash
	Consumed  bool
	LogIndex  uint
	JobID     int32
}

LogBroadcast - data from log_broadcasts table columns

func (LogBroadcast) AsKey

func (b LogBroadcast) AsKey() LogBroadcastAsKey

type LogBroadcastAsKey

type LogBroadcastAsKey struct {
	BlockHash common.Hash
	LogIndex  uint
	JobId     int32
}

LogBroadcastAsKey - used as key in a map to filter out already consumed logs

func NewLogBroadcastAsKey

func NewLogBroadcastAsKey(log types.Log, listener Listener) LogBroadcastAsKey

type NullBroadcaster

type NullBroadcaster struct{ ErrMsg string }

func (*NullBroadcaster) AddDependents

func (n *NullBroadcaster) AddDependents(int)

func (*NullBroadcaster) AwaitDependents

func (n *NullBroadcaster) AwaitDependents() <-chan struct{}

func (*NullBroadcaster) BackfillBlockNumber

func (n *NullBroadcaster) BackfillBlockNumber() null.Int64

func (*NullBroadcaster) Close

func (n *NullBroadcaster) Close() error

func (*NullBroadcaster) DependentReady

func (n *NullBroadcaster) DependentReady()

DependentReady does noop for NullBroadcaster.

func (*NullBroadcaster) Healthy

func (n *NullBroadcaster) Healthy() error

func (*NullBroadcaster) IsConnected

func (n *NullBroadcaster) IsConnected() bool

func (*NullBroadcaster) LogsFromBlock

func (n *NullBroadcaster) LogsFromBlock(common.Hash) int

func (*NullBroadcaster) MarkConsumed

func (n *NullBroadcaster) MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error

func (*NullBroadcaster) OnNewLongestChain

func (n *NullBroadcaster) OnNewLongestChain(context.Context, *evmtypes.Head)

func (*NullBroadcaster) Pause

func (n *NullBroadcaster) Pause()

func (*NullBroadcaster) Ready

func (n *NullBroadcaster) Ready() error

func (*NullBroadcaster) Register

func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())

func (*NullBroadcaster) ReplayFromBlock

func (n *NullBroadcaster) ReplayFromBlock(number int64, forceBroadcast bool)

ReplayFromBlock implements the Broadcaster interface.

func (*NullBroadcaster) Resume

func (n *NullBroadcaster) Resume()

func (*NullBroadcaster) Start

Start does noop for NullBroadcaster.

func (*NullBroadcaster) TrackedAddressesCount

func (n *NullBroadcaster) TrackedAddressesCount() uint32

func (*NullBroadcaster) WasAlreadyConsumed

func (n *NullBroadcaster) WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error)

type ORM

type ORM interface {
	// FindBroadcasts returns broadcasts for a range of block numbers, both consumed and unconsumed.
	FindBroadcasts(fromBlockNum int64, toBlockNum int64) ([]LogBroadcast, error)
	// CreateBroadcast inserts an unconsumed log broadcast for jobID.
	CreateBroadcast(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error
	// WasBroadcastConsumed returns true if jobID consumed the log broadcast.
	WasBroadcastConsumed(blockHash common.Hash, logIndex uint, jobID int32, qopts ...pg.QOpt) (bool, error)
	// MarkBroadcastConsumed marks the log broadcast as consumed by jobID.
	MarkBroadcastConsumed(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error
	// MarkBroadcastsUnconsumed marks all log broadcasts from all jobs on or after fromBlock as
	// unconsumed.
	MarkBroadcastsUnconsumed(fromBlock int64, qopts ...pg.QOpt) error

	// SetPendingMinBlock sets the minimum block number for which there are pending broadcasts in the pool, or nil if empty.
	SetPendingMinBlock(blockNum *int64, qopts ...pg.QOpt) error
	// GetPendingMinBlock returns the minimum block number for which there were pending broadcasts in the pool, or nil if it was empty.
	GetPendingMinBlock(qopts ...pg.QOpt) (blockNumber *int64, err error)

	// Reinitialize cleans up the database by removing any unconsumed broadcasts, then updating (if necessary) and
	// returning the pending minimum block number.
	Reinitialize(qopts ...pg.QOpt) (blockNumber *int64, err error)
}

ORM is the interface for log broadcasts.

  • Unconsumed broadcasts are created just before notifying subscribers, who are responsible for marking them consumed.
  • Pending broadcast block numbers are synced to the min from the pool (or deleted when empty)
  • On reboot, backfill considers the min block number from unconsumed and pending broadcasts. Additionally, unconsumed entries are removed and the pending broadcasts number updated.

type ParseLogFunc

type ParseLogFunc func(log types.Log) (generated.AbigenLog, error)

type Topic

type Topic common.Hash

type Uint64

type Uint64 uint64

func (Uint64) Compare

func (a Uint64) Compare(b heaps.Item) int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL