Documentation ¶
Overview ¶
Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.
Application ¶
The Application is the main component used for starting and stopping the Chainlink node.
JobRunner ¶
The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.
JobSubscriber ¶
The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.
Scheduler ¶
The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.
Index ¶
- Constants
- Variables
- func BeginRun(job models.JobSpec, initr models.Initiator, input models.RunResult, ...) (models.JobRun, error)
- func BeginRunAtBlock(job models.JobSpec, initr models.Initiator, input models.RunResult, ...) (models.JobRun, error)
- func BuildRun(job models.JobSpec, i models.Initiator, store *store.Store) (models.JobRun, error)
- func ExecuteRun(jr models.JobRun, store *store.Store, overrides models.RunResult) (models.JobRun, error)
- func ExecuteRunAtBlock(jr models.JobRun, store *store.Store, overrides models.RunResult, ...) (models.JobRun, error)
- func NewInitiatorFilterQuery(initr models.Initiator, head *models.IndexableBlockNumber, ...) ethereum.FilterQuery
- func TopicFiltersForRunLog(jobID string) [][]common.Hash
- func ValidateAdapter(bt *models.BridgeType, store *store.Store) (err error)
- func ValidateInitiator(i models.Initiator, j models.JobSpec) error
- func ValidateJob(j models.JobSpec, store *store.Store) error
- func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error
- type Afterer
- type Application
- type ChainlinkApplication
- func (app *ChainlinkApplication) AddAdapter(bt *models.BridgeType) error
- func (app *ChainlinkApplication) AddJob(job models.JobSpec) error
- func (app *ChainlinkApplication) GetStore() *store.Store
- func (app *ChainlinkApplication) RemoveAdapter(bt *models.BridgeType) error
- func (app *ChainlinkApplication) Start() error
- func (app *ChainlinkApplication) Stop() error
- type Cron
- type HeadTrackable
- type HeadTracker
- func (ht *HeadTracker) Attach(t HeadTrackable) string
- func (ht *HeadTracker) Detach(id string)
- func (ht *HeadTracker) IsConnected() bool
- func (ht *HeadTracker) LastRecord() *models.IndexableBlockNumber
- func (ht *HeadTracker) Save(n *models.IndexableBlockNumber) error
- func (ht *HeadTracker) Start() error
- func (ht *HeadTracker) Stop() error
- type InitiatorSubscription
- type InitiatorSubscriptionLogEvent
- func (le InitiatorSubscriptionLogEvent) ContractPayment() (*assets.Link, error)
- func (le InitiatorSubscriptionLogEvent) EthLogJSON() (models.JSON, error)
- func (le InitiatorSubscriptionLogEvent) ForLogger(kvs ...interface{}) []interface{}
- func (le InitiatorSubscriptionLogEvent) RunLogJSON() (models.JSON, error)
- func (le InitiatorSubscriptionLogEvent) ToDebug()
- func (le InitiatorSubscriptionLogEvent) ToIndexableBlockNumber() *models.IndexableBlockNumber
- func (le InitiatorSubscriptionLogEvent) ValidateRunLog() bool
- type JobRunner
- type JobRunnerError
- type JobSubscriber
- type JobSubscription
- type ManagedSubscription
- type Nower
- type OneTime
- type Reaper
- type Recurring
- type Scheduler
- type Unsubscriber
Constants ¶
const ( RunLogTopicSignature = iota RunLogTopicInternalID RunLogTopicJobID RunLogTopicAmount )
Descriptive indices of a RunLog's Topic array
const OracleFulfillmentFunctionID = "0x76005c26"
OracleFulfillmentFunctionID is the function id of the oracle fulfillment method used by EthTx: bytes4(keccak256("fulfillData(uint256,bytes32)")) Kept in sync with solidity/contracts/Oracle.sol
Variables ¶
var RunLogTopic = common.HexToHash("0x3fab86a1207bdcfe3976d0d9df25f263d45ae8d381a60960559771a2b223974d")
RunLogTopic is the signature for the RunRequest(...) event which Chainlink RunLog initiators watch for. See https://github.com/smartcontractkit/chainlink/blob/master/solidity/contracts/Oracle.sol If updating this, be sure to update the truffle suite's "expected event signature" test.
Functions ¶
func BeginRun ¶
func BeginRun( job models.JobSpec, initr models.Initiator, input models.RunResult, store *store.Store, ) (models.JobRun, error)
BeginRun creates a new run if the job is valid and starts the job.
func BeginRunAtBlock ¶
func BeginRunAtBlock( job models.JobSpec, initr models.Initiator, input models.RunResult, store *store.Store, bn *models.IndexableBlockNumber, ) (models.JobRun, error)
BeginRunAtBlock builds and executes a new run if the job is valid with the block number to determine if tasks should be resumed.
func BuildRun ¶
BuildRun checks to ensure the given job has not started or ended before creating a new run for the job.
func ExecuteRun ¶
func ExecuteRun(jr models.JobRun, store *store.Store, overrides models.RunResult) (models.JobRun, error)
ExecuteRun calls ExecuteRunAtBlock without an IndexableBlockNumber
func ExecuteRunAtBlock ¶
func ExecuteRunAtBlock( jr models.JobRun, store *store.Store, overrides models.RunResult, bn *models.IndexableBlockNumber, ) (models.JobRun, error)
ExecuteRunAtBlock starts the job and executes task runs within that job in the order defined in the run for as long as they do not return errors. Results are saved in the store (db).
func NewInitiatorFilterQuery ¶
func NewInitiatorFilterQuery( initr models.Initiator, head *models.IndexableBlockNumber, topics [][]common.Hash, ) ethereum.FilterQuery
NewInitiatorFilterQuery returns a new InitiatorSubscriber with initialized filter.
func TopicFiltersForRunLog ¶
TopicFiltersForRunLog generates the two variations of RunLog IDs that could possibly be entered. There is the ID, hex encoded and the ID zero padded.
func ValidateAdapter ¶
func ValidateAdapter(bt *models.BridgeType, store *store.Store) (err error)
ValidateAdapter checks that the bridge type doesn't have a duplicate or invalid name
func ValidateInitiator ¶
ValidateInitiator checks the Initiator for any application logic errors.
func ValidateJob ¶
ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.
func ValidateServiceAgreement ¶
func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error
ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.
Types ¶
type Afterer ¶
Afterer is an interface that fulfills the After method, following the behavior of time.After.
type Application ¶
Application implements the common functions used in the core node.
func NewApplication ¶
func NewApplication(config store.Config) Application
NewApplication initializes a new store if one is not already present at the configured root directory (default: ~/.chainlink), the logger at the same directory and returns the Application to be used by the node.
type ChainlinkApplication ¶
type ChainlinkApplication struct { Exiter func(int) HeadTracker *HeadTracker JobRunner JobRunner JobSubscriber JobSubscriber Scheduler *Scheduler Store *store.Store Reaper Reaper // contains filtered or unexported fields }
ChainlinkApplication contains fields for the JobSubscriber, Scheduler, and Store. The JobSubscriber and Scheduler are also available in the services package, but the Store has its own package.
func (*ChainlinkApplication) AddAdapter ¶
func (app *ChainlinkApplication) AddAdapter(bt *models.BridgeType) error
AddAdapter adds an adapter to the store. If another adapter with the same name already exists the adapter will not be added.
func (*ChainlinkApplication) AddJob ¶
func (app *ChainlinkApplication) AddJob(job models.JobSpec) error
AddJob adds a job to the store and the scheduler. If there was an error from adding the job to the store, the job will not be added to the scheduler.
func (*ChainlinkApplication) GetStore ¶
func (app *ChainlinkApplication) GetStore() *store.Store
GetStore returns the pointer to the store for the ChainlinkApplication.
func (*ChainlinkApplication) RemoveAdapter ¶
func (app *ChainlinkApplication) RemoveAdapter(bt *models.BridgeType) error
RemoveAdapter removes an adapter from the store.
func (*ChainlinkApplication) Start ¶
func (app *ChainlinkApplication) Start() error
Start runs the JobSubscriber and Scheduler. If successful, nil will be returned. Also listens for interrupt signals from the operating system so that the application can be properly closed before the application exits.
func (*ChainlinkApplication) Stop ¶
func (app *ChainlinkApplication) Stop() error
Stop allows the application to exit by halting schedules, closing logs, and closing the DB connection.
type Cron ¶
Cron is an interface for scheduling recurring functions to run. Cron's schedule format is similar to the standard cron format but with an extra field at the beginning for seconds.
type HeadTrackable ¶
type HeadTrackable interface { Connect(*models.IndexableBlockNumber) error Disconnect() OnNewHead(*models.BlockHeader) }
HeadTrackable represents any object that wishes to respond to ethereum events, after being attached to HeadTracker.
type HeadTracker ¶
type HeadTracker struct {
// contains filtered or unexported fields
}
HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.
func NewHeadTracker ¶
func NewHeadTracker(store *store.Store, sleepers ...utils.Sleeper) *HeadTracker
NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.
func (*HeadTracker) Attach ¶
func (ht *HeadTracker) Attach(t HeadTrackable) string
Attach registers an object that will have HeadTrackable events fired on occurence, such as Connect.
func (*HeadTracker) Detach ¶
func (ht *HeadTracker) Detach(id string)
Detach deregisters an object from having HeadTrackable events fired.
func (*HeadTracker) IsConnected ¶
func (ht *HeadTracker) IsConnected() bool
IsConnected returns whether or not this HeadTracker is connected.
func (*HeadTracker) LastRecord ¶
func (ht *HeadTracker) LastRecord() *models.IndexableBlockNumber
LastRecord returns the latest block header being tracked, or nil.
func (*HeadTracker) Save ¶
func (ht *HeadTracker) Save(n *models.IndexableBlockNumber) error
Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.
func (*HeadTracker) Start ¶
func (ht *HeadTracker) Start() error
Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.
func (*HeadTracker) Stop ¶
func (ht *HeadTracker) Stop() error
Stop unsubscribes all connections and fires Disconnect.
type InitiatorSubscription ¶
type InitiatorSubscription struct { *ManagedSubscription Job models.JobSpec Initiator models.Initiator // contains filtered or unexported fields }
InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.
func NewInitiatorSubscription ¶
func NewInitiatorSubscription( initr models.Initiator, job models.JobSpec, store *store.Store, filter ethereum.FilterQuery, callback func(InitiatorSubscriptionLogEvent), ) (InitiatorSubscription, error)
NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.
type InitiatorSubscriptionLogEvent ¶
type InitiatorSubscriptionLogEvent struct { Log types.Log Job models.JobSpec Initiator models.Initiator // contains filtered or unexported fields }
InitiatorSubscriptionLogEvent encapsulates all information as a result of a received log from an InitiatorSubscription.
func (InitiatorSubscriptionLogEvent) ContractPayment ¶
func (le InitiatorSubscriptionLogEvent) ContractPayment() (*assets.Link, error)
ContractPayment returns the amount attached to a contract to pay the Oracle upon fulfillment.
func (InitiatorSubscriptionLogEvent) EthLogJSON ¶
func (le InitiatorSubscriptionLogEvent) EthLogJSON() (models.JSON, error)
EthLogJSON reformats the log as JSON.
func (InitiatorSubscriptionLogEvent) ForLogger ¶
func (le InitiatorSubscriptionLogEvent) ForLogger(kvs ...interface{}) []interface{}
ForLogger formats the InitiatorSubscriptionLogEvent for easy common formatting in logs (trace statements, not ethereum events).
func (InitiatorSubscriptionLogEvent) RunLogJSON ¶
func (le InitiatorSubscriptionLogEvent) RunLogJSON() (models.JSON, error)
RunLogJSON extracts data from the log's topics and data specific to the format defined by RunLogs.
func (InitiatorSubscriptionLogEvent) ToDebug ¶
func (le InitiatorSubscriptionLogEvent) ToDebug()
ToDebug prints this event via logger.Debug.
func (InitiatorSubscriptionLogEvent) ToIndexableBlockNumber ¶
func (le InitiatorSubscriptionLogEvent) ToIndexableBlockNumber() *models.IndexableBlockNumber
ToIndexableBlockNumber returns an IndexableBlockNumber for the given InitiatorSubscriptionLogEvent Block
func (InitiatorSubscriptionLogEvent) ValidateRunLog ¶
func (le InitiatorSubscriptionLogEvent) ValidateRunLog() bool
ValidateRunLog returns whether or not the contained log is a RunLog, a specific Chainlink event trigger from smart contracts.
type JobRunner ¶
type JobRunner interface { Start() error Stop() // contains filtered or unexported methods }
JobRunner safely handles coordinating job runs.
func NewJobRunner ¶
NewJobRunner initializes a JobRunner.
type JobRunnerError ¶
type JobRunnerError struct {
// contains filtered or unexported fields
}
JobRunnerError contains the field for the error message.
func (JobRunnerError) Error ¶
func (err JobRunnerError) Error() string
Error returns the error message for the run.
type JobSubscriber ¶
type JobSubscriber interface { HeadTrackable AddJob(job models.JobSpec, bn *models.IndexableBlockNumber) error Jobs() []models.JobSpec }
JobSubscriber listens for push notifications from the ethereum node's websocket for specific jobs.
func NewJobSubscriber ¶
func NewJobSubscriber(store *store.Store) JobSubscriber
NewJobSubscriber returns a new job subscriber.
type JobSubscription ¶
JobSubscription listens to event logs being pushed from the Ethereum Node to a job.
func StartJobSubscription ¶
func StartJobSubscription(job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (JobSubscription, error)
StartJobSubscription is the constructor of JobSubscription that to starts listening to and keeps track of event logs corresponding to a job.
func (JobSubscription) Unsubscribe ¶
func (js JobSubscription) Unsubscribe()
Unsubscribe stops the subscription and cleans up associated resources.
type ManagedSubscription ¶
type ManagedSubscription struct {
// contains filtered or unexported fields
}
ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.
func NewManagedSubscription ¶
func NewManagedSubscription( store *store.Store, filter ethereum.FilterQuery, callback func(types.Log), ) (*ManagedSubscription, error)
NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.
func (ManagedSubscription) Unsubscribe ¶
func (sub ManagedSubscription) Unsubscribe()
Unsubscribe closes channels and cleans up resources.
type Nower ¶
Nower is an interface that fulfills the Now method, following the behavior of time.Now.
type OneTime ¶
OneTime represents runs that are to be executed only once.
func (*OneTime) RunJobAt ¶
RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.
type Reaper ¶
Reaper interface defines the methods used to reap stale objects such as sessions.
func NewStoreReaper ¶
NewStoreReaper creates a reaper that cleans stale objects from the store.
type Recurring ¶
Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().
func NewRecurring ¶
NewRecurring create a new instance of Recurring, ready to use.
func (*Recurring) AddJob ¶
AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.
type Scheduler ¶
type Scheduler struct { Recurring *Recurring OneTime *OneTime // contains filtered or unexported fields }
Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.
func NewScheduler ¶
NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.
func (*Scheduler) AddJob ¶
AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.
type Unsubscriber ¶
type Unsubscriber interface {
Unsubscribe()
}
Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.
func StartEthLogSubscription ¶
func StartEthLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)
StartEthLogSubscription starts an InitiatorSubscription tailored for use with EthLogs.
func StartRunLogSubscription ¶
func StartRunLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)
StartRunLogSubscription starts an InitiatorSubscription tailored for use with RunLogs.