log

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2021 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ListenerJobID

func ListenerJobID(listener Listener) interface{}

ListenerJobID returns the appropriate job ID for a listener

func NewBroadcaster

func NewBroadcaster(orm ORM, ethClient eth.Client, config Config, highestSavedHead *models.Head) *broadcaster

NewBroadcaster creates a new instance of the broadcaster

func NewORM

func NewORM(db *gorm.DB) *orm

Types

type Broadcast

type Broadcast interface {
	DecodedLog() interface{}
	RawLog() types.Log
	SetDecodedLog(interface{})
	String() string
	LatestBlockNumber() uint64
	LatestBlockHash() common.Hash
	JobID() interface{}
}

The Broadcast type wraps a types.Log but provides additional functionality for determining whether or not the log has been consumed and for marking the log as consumed

func NewLogBroadcast

func NewLogBroadcast(rawLog types.Log) Broadcast

type Broadcaster

type Broadcaster interface {
	utils.DependentAwaiter
	service.Service
	httypes.HeadTrackable
	IsConnected() bool
	Register(listener Listener, opts ListenerOpts) (unsubscribe func())
	LatestHead() *models.Head
	TrackedAddressesCount() uint32
	// DB interactions
	WasAlreadyConsumed(db *gorm.DB, lb Broadcast) (bool, error)
	MarkConsumed(db *gorm.DB, lb Broadcast) error
}

The Broadcaster manages log subscription requests for the Chainlink node. Instead of creating a new subscription for each request, it multiplexes all subscriptions to all of the relevant contracts over a single connection and forwards the logs to the relevant subscribers.

In case of node crash and/or restart, the logs will be backfilled from the latest head from DB, for subscribers that are added before all dependents of LogBroadcaster are done.

If a subscriber is added after the LogBroadcaster does the initial backfill, then it's possible/likely that the backfill fill only have depth: 1 (from latest head)

Of course, these backfilled logs + any new logs will only be sent after the NumConfirmations for given subscriber.

type Config

type Config interface {
	BlockBackfillDepth() uint64
	EthFinalityDepth() uint
	EthLogBackfillBatchSize() uint32
}

type Listener

type Listener interface {
	HandleLog(b Broadcast)
	JobID() models.JobID
	JobIDV2() int32
	IsV2Job() bool
}

The Listener responds to log events through HandleLog.

type ListenerOpts

type ListenerOpts struct {
	Contract common.Address

	// Event types to receive, with value filter for each field in the event
	// No filter or an empty filter for a given field position mean: all values allowed
	// the key should be a result of AbigenLog.Topic() call
	LogsWithTopics map[common.Hash][][]Topic

	ParseLog ParseLogFunc

	// Minimum number of block confirmations before the log is received
	NumConfirmations uint64
}

type NullBroadcaster

type NullBroadcaster struct{ ErrMsg string }

func (*NullBroadcaster) AddDependents

func (n *NullBroadcaster) AddDependents(int)

func (*NullBroadcaster) AwaitDependents

func (n *NullBroadcaster) AwaitDependents() <-chan struct{}

func (*NullBroadcaster) Close

func (n *NullBroadcaster) Close() error

func (*NullBroadcaster) Connect

func (n *NullBroadcaster) Connect(*models.Head) error

func (*NullBroadcaster) DependentReady

func (n *NullBroadcaster) DependentReady()

func (*NullBroadcaster) Healthy

func (n *NullBroadcaster) Healthy() error

func (*NullBroadcaster) IsConnected

func (n *NullBroadcaster) IsConnected() bool

func (*NullBroadcaster) LatestHead

func (n *NullBroadcaster) LatestHead() *models.Head

func (*NullBroadcaster) MarkConsumed

func (n *NullBroadcaster) MarkConsumed(db *gorm.DB, lb Broadcast) error

func (*NullBroadcaster) OnNewLongestChain

func (n *NullBroadcaster) OnNewLongestChain(context.Context, models.Head)

func (*NullBroadcaster) Ready

func (n *NullBroadcaster) Ready() error

func (*NullBroadcaster) Register

func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())

func (*NullBroadcaster) Start

func (n *NullBroadcaster) Start() error

func (*NullBroadcaster) TrackedAddressesCount

func (n *NullBroadcaster) TrackedAddressesCount() uint32

func (*NullBroadcaster) WasAlreadyConsumed

func (n *NullBroadcaster) WasAlreadyConsumed(db *gorm.DB, lb Broadcast) (bool, error)

type ORM

type ORM interface {
	WasBroadcastConsumed(tx *gorm.DB, blockHash common.Hash, logIndex uint, jobID interface{}) (bool, error)
	MarkBroadcastConsumed(tx *gorm.DB, blockHash common.Hash, blockNumber uint64, logIndex uint, jobID interface{}) error
}

type ParseLogFunc

type ParseLogFunc func(log types.Log) (generated.AbigenLog, error)

type Topic

type Topic common.Hash

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL