Documentation ¶
Index ¶
- func ListenerJobID(listener Listener) interface{}
- func NewBroadcaster(orm ORM, ethClient eth.Client, config Config, highestSavedHead *models.Head) *broadcaster
- func NewORM(db *gorm.DB) *orm
- type Broadcast
- type Broadcaster
- type Config
- type Listener
- type ListenerOpts
- type NullBroadcaster
- func (n *NullBroadcaster) AddDependents(int)
- func (n *NullBroadcaster) AwaitDependents() <-chan struct{}
- func (n *NullBroadcaster) Close() error
- func (n *NullBroadcaster) Connect(*models.Head) error
- func (n *NullBroadcaster) DependentReady()
- func (n *NullBroadcaster) Healthy() error
- func (n *NullBroadcaster) IsConnected() bool
- func (n *NullBroadcaster) LatestHead() *models.Head
- func (n *NullBroadcaster) MarkConsumed(db *gorm.DB, lb Broadcast) error
- func (n *NullBroadcaster) OnNewLongestChain(context.Context, models.Head)
- func (n *NullBroadcaster) Ready() error
- func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())
- func (n *NullBroadcaster) Start() error
- func (n *NullBroadcaster) TrackedAddressesCount() uint32
- func (n *NullBroadcaster) WasAlreadyConsumed(db *gorm.DB, lb Broadcast) (bool, error)
- type ORM
- type ParseLogFunc
- type Topic
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ListenerJobID ¶
func ListenerJobID(listener Listener) interface{}
ListenerJobID returns the appropriate job ID for a listener
Types ¶
type Broadcast ¶
type Broadcast interface { DecodedLog() interface{} RawLog() types.Log SetDecodedLog(interface{}) String() string LatestBlockNumber() uint64 LatestBlockHash() common.Hash JobID() interface{} }
The Broadcast type wraps a types.Log but provides additional functionality for determining whether or not the log has been consumed and for marking the log as consumed
func NewLogBroadcast ¶
type Broadcaster ¶
type Broadcaster interface { utils.DependentAwaiter service.Service httypes.HeadTrackable IsConnected() bool Register(listener Listener, opts ListenerOpts) (unsubscribe func()) LatestHead() *models.Head TrackedAddressesCount() uint32 // DB interactions WasAlreadyConsumed(db *gorm.DB, lb Broadcast) (bool, error) MarkConsumed(db *gorm.DB, lb Broadcast) error }
The Broadcaster manages log subscription requests for the Chainlink node. Instead of creating a new subscription for each request, it multiplexes all subscriptions to all of the relevant contracts over a single connection and forwards the logs to the relevant subscribers.
In case of node crash and/or restart, the logs will be backfilled from the latest head from DB, for subscribers that are added before all dependents of LogBroadcaster are done.
If a subscriber is added after the LogBroadcaster does the initial backfill, then it's possible/likely that the backfill fill only have depth: 1 (from latest head)
Of course, these backfilled logs + any new logs will only be sent after the NumConfirmations for given subscriber.
type Listener ¶
type Listener interface { HandleLog(b Broadcast) JobID() models.JobID JobIDV2() int32 IsV2Job() bool }
The Listener responds to log events through HandleLog.
type ListenerOpts ¶
type ListenerOpts struct { Contract common.Address // Event types to receive, with value filter for each field in the event // No filter or an empty filter for a given field position mean: all values allowed // the key should be a result of AbigenLog.Topic() call LogsWithTopics map[common.Hash][][]Topic ParseLog ParseLogFunc // Minimum number of block confirmations before the log is received NumConfirmations uint64 }
type NullBroadcaster ¶
type NullBroadcaster struct{ ErrMsg string }
func (*NullBroadcaster) AddDependents ¶
func (n *NullBroadcaster) AddDependents(int)
func (*NullBroadcaster) AwaitDependents ¶
func (n *NullBroadcaster) AwaitDependents() <-chan struct{}
func (*NullBroadcaster) Close ¶
func (n *NullBroadcaster) Close() error
func (*NullBroadcaster) DependentReady ¶
func (n *NullBroadcaster) DependentReady()
func (*NullBroadcaster) Healthy ¶
func (n *NullBroadcaster) Healthy() error
func (*NullBroadcaster) IsConnected ¶
func (n *NullBroadcaster) IsConnected() bool
func (*NullBroadcaster) LatestHead ¶
func (n *NullBroadcaster) LatestHead() *models.Head
func (*NullBroadcaster) MarkConsumed ¶
func (n *NullBroadcaster) MarkConsumed(db *gorm.DB, lb Broadcast) error
func (*NullBroadcaster) OnNewLongestChain ¶
func (n *NullBroadcaster) OnNewLongestChain(context.Context, models.Head)
func (*NullBroadcaster) Ready ¶
func (n *NullBroadcaster) Ready() error
func (*NullBroadcaster) Register ¶
func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())
func (*NullBroadcaster) Start ¶
func (n *NullBroadcaster) Start() error
func (*NullBroadcaster) TrackedAddressesCount ¶
func (n *NullBroadcaster) TrackedAddressesCount() uint32