Documentation
¶
Overview ¶
Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.
Application ¶
The Application is the main component used for starting and stopping the Chainlink node.
JobRunner ¶
The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.
EthereumListener ¶
The EthereumListener coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.
Scheduler ¶
The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.
Index ¶
- Constants
- Variables
- func BeginRun(job models.JobSpec, store *store.Store, input models.RunResult) (models.JobRun, error)
- func BuildRun(job models.JobSpec, store *store.Store) (models.JobRun, error)
- func ExecuteRun(run models.JobRun, store *store.Store, input models.RunResult) (models.JobRun, error)
- func ReceiveEthLog(le RPCLogEvent)
- func ReceiveRunLog(le RPCLogEvent)
- func ValidateInitiator(i models.Initiator, j models.JobSpec) error
- func ValidateJob(j models.JobSpec, store *store.Store) error
- type Afterer
- type Application
- type ChainlinkApplication
- type Cron
- type EthereumListener
- func (el *EthereumListener) AddJob(job models.JobSpec) error
- func (el *EthereumListener) Connect() error
- func (el *EthereumListener) Disconnect()
- func (el *EthereumListener) Jobs() []models.JobSpec
- func (el *EthereumListener) OnNewHead(_ *models.BlockHeader)
- func (el *EthereumListener) Start() error
- func (el *EthereumListener) Stop() error
- type HeadTrackable
- type HeadTracker
- func (ht *HeadTracker) Attach(t HeadTrackable) string
- func (ht *HeadTracker) Connect()
- func (ht *HeadTracker) Detach(id string)
- func (ht *HeadTracker) Disconnect()
- func (ht *HeadTracker) Get() *models.IndexableBlockNumber
- func (ht *HeadTracker) IsConnected() bool
- func (ht *HeadTracker) OnNewHead(head *models.BlockHeader)
- func (ht *HeadTracker) Save(n *models.IndexableBlockNumber) error
- func (ht *HeadTracker) Start() error
- func (ht *HeadTracker) Stop() error
- type JobRunnerError
- type JobSubscription
- type NoOpHeadTrackable
- type Nower
- type OneTime
- type RPCLogEvent
- type RPCLogSubscription
- type Recurring
- type Scheduler
- type Unsubscriber
Constants ¶
const ( EventTopicSignature = iota EventTopicRequestID EventTopicJobID )
Descriptive indices of a RunLog's Topic array
Variables ¶
var RunLogTopic = common.HexToHash("0x06f4bf36b4e011a5c499cef1113c2d166800ce4013f6c2509cab1a0e92b83fb2")
RunLogTopic is the signature for the Request(uint256,bytes32,string) event which Chainlink RunLog initiators watch for. See https://github.com/smartcontractkit/chainlink/blob/master/solidity/contracts/Oracle.sol
Functions ¶
func BeginRun ¶
func BeginRun(job models.JobSpec, store *store.Store, input models.RunResult) (models.JobRun, error)
BeginRun creates a new run if the job is valid and starts the job.
func BuildRun ¶
BuildRun checks to ensure the given job has not started or ended before creating a new run for the job.
func ExecuteRun ¶
func ExecuteRun(run models.JobRun, store *store.Store, input models.RunResult) (models.JobRun, error)
ExecuteRun starts the job and executes task runs within that job in the order defined in the run for as long as they do not return errors. Results are saved in the store (db).
func ReceiveEthLog ¶
func ReceiveEthLog(le RPCLogEvent)
Parse the log and run the job specific to this initiator log event.
func ReceiveRunLog ¶
func ReceiveRunLog(le RPCLogEvent)
Parse the log and run the job specific to this initiator log event.
func ValidateInitiator ¶
ValidateInitiator checks the Initiator for any application logic errors.
Types ¶
type Afterer ¶
Afterer is an interface that fulfills the After method, following the behavior of time.After.
type Application ¶
Application implements the common functions used in the core node.
func NewApplication ¶
func NewApplication(config store.Config) Application
NewApplication initializes a new store if one is not already present at the configured root directory (default: ~/.chainlink), the logger at the same directory and returns the Application to be used by the node.
type ChainlinkApplication ¶
type ChainlinkApplication struct { HeadTracker *HeadTracker EthereumListener *EthereumListener Scheduler *Scheduler Store *store.Store }
ChainlinkApplication contains fields for the EthereumListener, Scheduler, and Store. The EthereumListener and Scheduler are also available in the services package, but the Store has its own package.
func (*ChainlinkApplication) AddJob ¶
func (app *ChainlinkApplication) AddJob(job models.JobSpec) error
AddJob adds a job to the store and the scheduler. If there was an error from adding the job to the store, the job will not be added to the scheduler.
func (*ChainlinkApplication) GetStore ¶
func (app *ChainlinkApplication) GetStore() *store.Store
GetStore returns the pointer to the store for the ChainlinkApplication.
func (*ChainlinkApplication) Start ¶
func (app *ChainlinkApplication) Start() error
Start runs the Store, EthereumListener, and Scheduler. If successful, nil will be returned.
func (*ChainlinkApplication) Stop ¶
func (app *ChainlinkApplication) Stop() error
Stop allows the application to exit by halting schedules, closing logs, and closing the DB connection.
type Cron ¶
Cron is an interface for scheduling recurring functions to run. Cron's schedule format is similar to the standard cron format but with an extra field at the beginning for seconds.
type EthereumListener ¶
type EthereumListener struct { Store *store.Store HeadTracker *HeadTracker // contains filtered or unexported fields }
EthereumListener manages push notifications from the ethereum node's websocket to listen for new heads and log events.
func (*EthereumListener) AddJob ¶
func (el *EthereumListener) AddJob(job models.JobSpec) error
AddJob looks for "runlog" and "ethlog" Initiators for a given job and watches the Ethereum blockchain for the addresses in the job.
func (*EthereumListener) Connect ¶
func (el *EthereumListener) Connect() error
func (*EthereumListener) Disconnect ¶
func (el *EthereumListener) Disconnect()
func (*EthereumListener) Jobs ¶
func (el *EthereumListener) Jobs() []models.JobSpec
func (*EthereumListener) OnNewHead ¶
func (el *EthereumListener) OnNewHead(_ *models.BlockHeader)
func (*EthereumListener) Start ¶
func (el *EthereumListener) Start() error
Start obtains the jobs from the store and subscribes to logs and newHeads in order to start and resume jobs waiting on events or confirmations.
func (*EthereumListener) Stop ¶
func (el *EthereumListener) Stop() error
Stop gracefully closes its access to the store's EthNotifications and resets resources.
type HeadTrackable ¶
type HeadTrackable interface { Connect() error Disconnect() OnNewHead(*models.BlockHeader) }
type HeadTracker ¶
type HeadTracker struct {
// contains filtered or unexported fields
}
Holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.
func NewHeadTracker ¶
func NewHeadTracker(store *store.Store, sleepers ...utils.Sleeper) *HeadTracker
Instantiates a new HeadTracker using the orm to persist new block numbers
func (*HeadTracker) Attach ¶
func (ht *HeadTracker) Attach(t HeadTrackable) string
func (*HeadTracker) Connect ¶
func (ht *HeadTracker) Connect()
func (*HeadTracker) Detach ¶
func (ht *HeadTracker) Detach(id string)
func (*HeadTracker) Disconnect ¶
func (ht *HeadTracker) Disconnect()
func (*HeadTracker) Get ¶
func (ht *HeadTracker) Get() *models.IndexableBlockNumber
Returns the latest block header being tracked, or nil.
func (*HeadTracker) IsConnected ¶
func (ht *HeadTracker) IsConnected() bool
func (*HeadTracker) OnNewHead ¶
func (ht *HeadTracker) OnNewHead(head *models.BlockHeader)
func (*HeadTracker) Save ¶
func (ht *HeadTracker) Save(n *models.IndexableBlockNumber) error
Updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.
func (*HeadTracker) Start ¶
func (ht *HeadTracker) Start() error
func (*HeadTracker) Stop ¶
func (ht *HeadTracker) Stop() error
type JobRunnerError ¶
type JobRunnerError struct {
// contains filtered or unexported fields
}
JobRunnerError contains the field for the error message.
func (JobRunnerError) Error ¶
func (err JobRunnerError) Error() string
Error returns the error message for the run.
type JobSubscription ¶
Listens to event logs being pushed from the Ethereum Node specific to a job.
func StartJobSubscription ¶
func StartJobSubscription(job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (JobSubscription, error)
Constructor of JobSubscription that to starts listening to and keeps track of event logs corresponding to a job.
func (JobSubscription) Unsubscribe ¶
func (js JobSubscription) Unsubscribe()
Stops the subscription and cleans up associated resources.
type NoOpHeadTrackable ¶
type NoOpHeadTrackable struct{}
func (NoOpHeadTrackable) Connect ¶
func (NoOpHeadTrackable) Connect() error
func (NoOpHeadTrackable) Disconnect ¶
func (NoOpHeadTrackable) Disconnect()
func (NoOpHeadTrackable) OnNewHead ¶
func (NoOpHeadTrackable) OnNewHead(*models.BlockHeader)
type Nower ¶
Nower is an interface that fulfills the Now method, following the behavior of time.Now.
type OneTime ¶
OneTime represents runs that are to be executed only once.
func (*OneTime) RunJobAt ¶
RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.
type RPCLogEvent ¶
type RPCLogEvent struct { Log types.Log Job models.JobSpec Initiator models.Initiator // contains filtered or unexported fields }
Encapsulates all information as a result of a received log from an RPCLogSubscription.
func (RPCLogEvent) EthLogJSON ¶
func (le RPCLogEvent) EthLogJSON() (models.JSON, error)
Reformat the log as JSON.
func (RPCLogEvent) ForLogger ¶
func (le RPCLogEvent) ForLogger(kvs ...interface{}) []interface{}
ForLogger formats the RPCLogEvent for easy common formatting in logs (trace statements, not ethereum events).
func (RPCLogEvent) RunLogJSON ¶
func (le RPCLogEvent) RunLogJSON() (models.JSON, error)
Extract data from the log's topics and data specific to the format defined by RunLogs.
func (RPCLogEvent) ValidateRunLog ¶
func (le RPCLogEvent) ValidateRunLog() bool
Return whether or not the contained log is a RunLog, a specific Chainlink event trigger from smart contracts.
type RPCLogSubscription ¶
type RPCLogSubscription struct { Job models.JobSpec Initiator models.Initiator ReceiveLog func(RPCLogEvent) // contains filtered or unexported fields }
Encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the ReceiveLog callback using a strategy pattern.
func NewRPCLogSubscription ¶
func NewRPCLogSubscription( initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store, callback func(RPCLogEvent), ) (RPCLogSubscription, error)
Create a new RPCLogSubscription that feeds received logs to the callback func parameter.
func (RPCLogSubscription) Unsubscribe ¶
func (sub RPCLogSubscription) Unsubscribe()
Close channels and clean up resources.
type Recurring ¶
Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().
func NewRecurring ¶
NewRecurring create a new instance of Recurring, ready to use.
func (*Recurring) AddJob ¶
AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.
type Scheduler ¶
type Scheduler struct { Recurring *Recurring OneTime *OneTime // contains filtered or unexported fields }
Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.
func NewScheduler ¶
NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.
func (*Scheduler) AddJob ¶
AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.
type Unsubscriber ¶
type Unsubscriber interface {
Unsubscribe()
}
Interface for all subscriptions made specific to a subscription.
func StartEthLogSubscription ¶
func StartEthLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)
Starts an RPCLogSubscription tailored for use with EthLogs.
func StartRunLogSubscription ¶
func StartRunLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)
Starts an RPCLogSubscription tailored for use with RunLogs.