log

package
v2.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2024 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBroadcaster

func NewBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr logger.Logger, highestSavedHead *evmtypes.Head, mailMon *mailbox.Monitor) *broadcaster

NewBroadcaster creates a new instance of the broadcaster

func NewORM

func NewORM(ds sqlutil.DataSource, evmChainID big.Int) *orm

Types

type AbigenContract

type AbigenContract interface {
	Address() common.Address
	ParseLog(log types.Log) (generated.AbigenLog, error)
}

type Broadcast

type Broadcast interface {
	DecodedLog() interface{}
	RawLog() types.Log
	String() string
	LatestBlockNumber() uint64
	LatestBlockHash() common.Hash
	ReceiptsRoot() common.Hash
	TransactionsRoot() common.Hash
	StateRoot() common.Hash
	JobID() int32
	EVMChainID() big.Int
}

The Broadcast type wraps a types.Log but provides additional functionality for determining whether or not the log has been consumed and for marking the log as consumed

func NewLogBroadcast

func NewLogBroadcast(rawLog types.Log, evmChainID big.Int, decodedLog interface{}) Broadcast

type Broadcaster

type Broadcaster interface {
	utils.DependentAwaiter
	services.Service
	httypes.HeadTrackable

	// ReplayFromBlock enqueues a replay from the provided block number. If forceBroadcast is
	// set to true, the broadcaster will broadcast logs that were already marked consumed
	// previously by any subscribers.
	ReplayFromBlock(number int64, forceBroadcast bool)

	IsConnected() bool
	Register(listener Listener, opts ListenerOpts) (unsubscribe func())

	WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error)
	// ds is optional
	MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error
}

The Broadcaster manages log subscription requests for the Chainlink node. Instead of creating a new subscription for each request, it multiplexes all subscriptions to all of the relevant contracts over a single connection and forwards the logs to the relevant subscribers.

In case of node crash and/or restart, the logs will be backfilled for subscribers that are added before all dependents of LogBroadcaster are done.

The backfill starts from the earliest block of either:

  • Latest DB head minus BlockBackfillDepth and the maximum number of confirmations.
  • Earliest pending or unconsumed log broadcast from DB.

If a subscriber is added after the LogBroadcaster does the initial backfill, then it's possible/likely that the backfill fill only have depth: 1 (from latest head)

Of course, these backfilled logs + any new logs will only be sent after the NumConfirmations for given subscriber.

type BroadcasterInTest

type BroadcasterInTest interface {
	Broadcaster
	BackfillBlockNumber() sql.NullInt64
	TrackedAddressesCount() uint32
	// Pause pauses the eventLoop until Resume is called.
	Pause()
	// Resume resumes the eventLoop after calling Pause.
	Resume()
	LogsFromBlock(bh common.Hash) int
}

type Config

type Config interface {
	BlockBackfillDepth() uint64
	BlockBackfillSkip() bool
	FinalityDepth() uint32
	LogBackfillBatchSize() uint32
}

type Listener

type Listener interface {
	HandleLog(ctx context.Context, b Broadcast)
	JobID() int32
}

The Listener responds to log events through HandleLog.

type ListenerOpts

type ListenerOpts struct {
	Contract common.Address

	// Event types to receive, with value filter for each field in the event
	// No filter or an empty filter for a given field position mean: all values allowed
	// the key should be a result of AbigenLog.Topic() call
	// topic => topicValueFilters
	LogsWithTopics map[common.Hash][][]Topic

	ParseLog ParseLogFunc

	// Minimum number of block confirmations before the log is received
	MinIncomingConfirmations uint32

	// ReplayStartedCallback is called by the log broadcaster once a replay request is received.
	ReplayStartedCallback func()
}

type LogBroadcast

type LogBroadcast struct {
	BlockHash common.Hash
	Consumed  bool
	LogIndex  uint
	JobID     int32
}

LogBroadcast - data from log_broadcasts table columns

func (LogBroadcast) AsKey

func (b LogBroadcast) AsKey() LogBroadcastAsKey

type LogBroadcastAsKey

type LogBroadcastAsKey struct {
	BlockHash common.Hash
	LogIndex  uint
	JobId     int32
}

LogBroadcastAsKey - used as key in a map to filter out already consumed logs

func NewLogBroadcastAsKey

func NewLogBroadcastAsKey(log types.Log, listener Listener) LogBroadcastAsKey

type NullBroadcaster

type NullBroadcaster struct{ ErrMsg string }

func (*NullBroadcaster) AddDependents

func (n *NullBroadcaster) AddDependents(int)

func (*NullBroadcaster) AwaitDependents

func (n *NullBroadcaster) AwaitDependents() <-chan struct{}

func (*NullBroadcaster) BackfillBlockNumber

func (n *NullBroadcaster) BackfillBlockNumber() sql.NullInt64

func (*NullBroadcaster) Close

func (n *NullBroadcaster) Close() error

func (*NullBroadcaster) DependentReady

func (n *NullBroadcaster) DependentReady()

DependentReady does noop for NullBroadcaster.

func (*NullBroadcaster) HealthReport

func (n *NullBroadcaster) HealthReport() map[string]error

func (*NullBroadcaster) IsConnected

func (n *NullBroadcaster) IsConnected() bool

func (*NullBroadcaster) LogsFromBlock

func (n *NullBroadcaster) LogsFromBlock(common.Hash) int

func (*NullBroadcaster) MarkConsumed

func (n *NullBroadcaster) MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error

func (*NullBroadcaster) Name

func (n *NullBroadcaster) Name() string

func (*NullBroadcaster) OnNewLongestChain

func (n *NullBroadcaster) OnNewLongestChain(context.Context, *evmtypes.Head)

func (*NullBroadcaster) Pause

func (n *NullBroadcaster) Pause()

func (*NullBroadcaster) Ready

func (n *NullBroadcaster) Ready() error

func (*NullBroadcaster) Register

func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())

func (*NullBroadcaster) ReplayFromBlock

func (n *NullBroadcaster) ReplayFromBlock(number int64, forceBroadcast bool)

ReplayFromBlock implements the Broadcaster interface.

func (*NullBroadcaster) Resume

func (n *NullBroadcaster) Resume()

func (*NullBroadcaster) Start

Start does noop for NullBroadcaster.

func (*NullBroadcaster) TrackedAddressesCount

func (n *NullBroadcaster) TrackedAddressesCount() uint32

func (*NullBroadcaster) WasAlreadyConsumed

func (n *NullBroadcaster) WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error)

type ORM

type ORM interface {
	// FindBroadcasts returns broadcasts for a range of block numbers, both consumed and unconsumed.
	FindBroadcasts(ctx context.Context, fromBlockNum int64, toBlockNum int64) ([]LogBroadcast, error)
	// CreateBroadcast inserts an unconsumed log broadcast for jobID.
	CreateBroadcast(ctx context.Context, blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32) error
	// WasBroadcastConsumed returns true if jobID consumed the log broadcast.
	WasBroadcastConsumed(ctx context.Context, blockHash common.Hash, logIndex uint, jobID int32) (bool, error)
	// MarkBroadcastConsumed marks the log broadcast as consumed by jobID.
	MarkBroadcastConsumed(ctx context.Context, blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32) error
	// MarkBroadcastsUnconsumed marks all log broadcasts from all jobs on or after fromBlock as
	// unconsumed.
	MarkBroadcastsUnconsumed(ctx context.Context, fromBlock int64) error

	// SetPendingMinBlock sets the minimum block number for which there are pending broadcasts in the pool, or nil if empty.
	SetPendingMinBlock(ctx context.Context, blockNum *int64) error
	// GetPendingMinBlock returns the minimum block number for which there were pending broadcasts in the pool, or nil if it was empty.
	GetPendingMinBlock(ctx context.Context) (blockNumber *int64, err error)

	// Reinitialize cleans up the database by removing any unconsumed broadcasts, then updating (if necessary) and
	// returning the pending minimum block number.
	Reinitialize(ctx context.Context) (blockNumber *int64, err error)

	WithDataSource(sqlutil.DataSource) ORM
}

ORM is the interface for log broadcasts.

  • Unconsumed broadcasts are created just before notifying subscribers, who are responsible for marking them consumed.
  • Pending broadcast block numbers are synced to the min from the pool (or deleted when empty)
  • On reboot, backfill considers the min block number from unconsumed and pending broadcasts. Additionally, unconsumed entries are removed and the pending broadcasts number updated.

type ParseLogFunc

type ParseLogFunc func(log types.Log) (generated.AbigenLog, error)

type Topic

type Topic common.Hash

type Uint64

type Uint64 uint64

func (Uint64) Compare

func (a Uint64) Compare(b heaps.Item) int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL