Documentation ¶
Overview ¶
Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.
Application ¶
The Application is the main component used for starting and stopping the Chainlink node.
JobRunner ¶
The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.
JobSubscriber ¶
The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.
Scheduler ¶
The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.
Index ¶
- func ExecuteJob(job models.JobSpec, initiator models.Initiator, input models.RunResult, ...) (*models.JobRun, error)
- func ExecuteJobWithRunRequest(job models.JobSpec, initiator models.Initiator, input models.RunResult, ...) (*models.JobRun, error)
- func NewRun(job models.JobSpec, initiator models.Initiator, input models.RunResult, ...) (*models.JobRun, error)
- func QueueSleepingTask(run *models.JobRun, store *store.Store) error
- func ReceiveLogRequest(store *strpkg.Store, le models.LogRequest)
- func ResumeConfirmingTask(run *models.JobRun, store *store.Store, currentBlockHeight *big.Int) error
- func ResumeConnectingTask(run *models.JobRun, store *store.Store) error
- func ResumePendingTask(run *models.JobRun, store *store.Store, input models.RunResult) error
- func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error
- func ValidateInitiator(i models.Initiator, j models.JobSpec) error
- func ValidateJob(j models.JobSpec, store *store.Store) error
- func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error
- type Application
- type ChainlinkApplication
- func (app *ChainlinkApplication) AddJob(job models.JobSpec) error
- func (app *ChainlinkApplication) AddServiceAgreement(sa *models.ServiceAgreement) error
- func (app *ChainlinkApplication) ArchiveJob(ID string) error
- func (app *ChainlinkApplication) GetStore() *store.Store
- func (app *ChainlinkApplication) NewBox() packr.Box
- func (app *ChainlinkApplication) Start() error
- func (app *ChainlinkApplication) Stop() error
- func (app *ChainlinkApplication) WakeSessionReaper()
- type Cron
- type HeadTracker
- type InitiatorSubscription
- type JobRunner
- type JobSubscriber
- type JobSubscription
- type ManagedSubscription
- type OneTime
- type Recurring
- type RecurringScheduleJobError
- type Scheduler
- type SleeperTask
- type Unsubscriber
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExecuteJob ¶
func ExecuteJob( job models.JobSpec, initiator models.Initiator, input models.RunResult, creationHeight *big.Int, store *store.Store) (*models.JobRun, error)
ExecuteJob saves and immediately begins executing a run for a specified job if it is ready.
func ExecuteJobWithRunRequest ¶
func ExecuteJobWithRunRequest( job models.JobSpec, initiator models.Initiator, input models.RunResult, creationHeight *big.Int, store *store.Store, runRequest models.RunRequest) (*models.JobRun, error)
ExecuteJobWithRunRequest saves and immediately begins executing a run for a specified job if it is ready, assigning the passed initiator run.
func NewRun ¶
func NewRun( job models.JobSpec, initiator models.Initiator, input models.RunResult, currentHeight *big.Int, store *store.Store) (*models.JobRun, error)
NewRun returns a run from an input job, in an initial state ready for processing by the job runner system
func QueueSleepingTask ¶
QueueSleepingTask creates a go routine which will wake up the job runner once the sleep's time has elapsed
func ReceiveLogRequest ¶
func ReceiveLogRequest(store *strpkg.Store, le models.LogRequest)
ReceiveLogRequest parses the log and runs the job indicated by a RunLog or ServiceAgreementExecutionLog. (Both log events have the same format.)
func ResumeConfirmingTask ¶
func ResumeConfirmingTask( run *models.JobRun, store *store.Store, currentBlockHeight *big.Int, ) error
ResumeConfirmingTask resumes a confirming run if the minimum confirmations have been met
func ResumeConnectingTask ¶
ResumeConnectingTask resumes a run that was left in pending_connection.
func ResumePendingTask ¶
ResumePendingTask takes the body provided from an external adapter, saves it for the next task to process, then tells the job runner to execute it
func ValidateBridgeType ¶
func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error
ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name or invalid url
func ValidateInitiator ¶
ValidateInitiator checks the Initiator for any application logic errors.
func ValidateJob ¶
ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.
func ValidateServiceAgreement ¶
func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error
ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.
Types ¶
type Application ¶
type Application interface { Start() error Stop() error GetStore() *store.Store WakeSessionReaper() AddJob(job models.JobSpec) error ArchiveJob(ID string) error AddServiceAgreement(*models.ServiceAgreement) error NewBox() packr.Box }
Application implements the common functions used in the core node.
func NewApplication ¶
func NewApplication(config store.Config, onConnectCallbacks ...func(Application)) Application
NewApplication initializes a new store if one is not already present at the configured root directory (default: ~/.chainlink), the logger at the same directory and returns the Application to be used by the node.
type ChainlinkApplication ¶
type ChainlinkApplication struct { Exiter func(int) HeadTracker *HeadTracker JobRunner JobRunner JobSubscriber JobSubscriber Scheduler *Scheduler Store *store.Store SessionReaper SleeperTask // contains filtered or unexported fields }
ChainlinkApplication contains fields for the JobSubscriber, Scheduler, and Store. The JobSubscriber and Scheduler are also available in the services package, but the Store has its own package.
func (*ChainlinkApplication) AddJob ¶
func (app *ChainlinkApplication) AddJob(job models.JobSpec) error
AddJob adds a job to the store and the scheduler. If there was an error from adding the job to the store, the job will not be added to the scheduler.
func (*ChainlinkApplication) AddServiceAgreement ¶
func (app *ChainlinkApplication) AddServiceAgreement(sa *models.ServiceAgreement) error
AddServiceAgreement adds a Service Agreement which includes a job that needs to be scheduled.
func (*ChainlinkApplication) ArchiveJob ¶
func (app *ChainlinkApplication) ArchiveJob(ID string) error
ArchiveJob silences the job from the system, preventing future job runs.
func (*ChainlinkApplication) GetStore ¶
func (app *ChainlinkApplication) GetStore() *store.Store
GetStore returns the pointer to the store for the ChainlinkApplication.
func (*ChainlinkApplication) NewBox ¶
func (app *ChainlinkApplication) NewBox() packr.Box
NewBox returns the packr.Box instance that holds the static assets to be delivered by the router.
func (*ChainlinkApplication) Start ¶
func (app *ChainlinkApplication) Start() error
Start runs the JobSubscriber and Scheduler. If successful, nil will be returned. Also listens for interrupt signals from the operating system so that the application can be properly closed before the application exits.
func (*ChainlinkApplication) Stop ¶
func (app *ChainlinkApplication) Stop() error
Stop allows the application to exit by halting schedules, closing logs, and closing the DB connection.
func (*ChainlinkApplication) WakeSessionReaper ¶
func (app *ChainlinkApplication) WakeSessionReaper()
WakeSessionReaper wakes up the reaper to do its reaping.
type Cron ¶
Cron is an interface for scheduling recurring functions to run. Cron's schedule format is similar to the standard cron format but with an extra field at the beginning for seconds.
type HeadTracker ¶
type HeadTracker struct {
// contains filtered or unexported fields
}
HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.
func NewHeadTracker ¶
func NewHeadTracker(store *strpkg.Store, callbacks []strpkg.HeadTrackable, sleepers ...utils.Sleeper) *HeadTracker
NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.
func (*HeadTracker) Connected ¶
func (ht *HeadTracker) Connected() bool
Connected returns whether or not this HeadTracker is connected.
func (*HeadTracker) Head ¶
func (ht *HeadTracker) Head() *models.Head
Head returns the latest block header being tracked, or nil.
func (*HeadTracker) Save ¶
func (ht *HeadTracker) Save(n *models.Head) error
Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.
func (*HeadTracker) Start ¶
func (ht *HeadTracker) Start() error
Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.
func (*HeadTracker) Stop ¶
func (ht *HeadTracker) Stop() error
Stop unsubscribes all connections and fires Disconnect.
type InitiatorSubscription ¶
type InitiatorSubscription struct { *ManagedSubscription Job models.JobSpec Initiator models.Initiator // contains filtered or unexported fields }
InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.
func NewInitiatorSubscription ¶
func NewInitiatorSubscription( initr models.Initiator, job models.JobSpec, store *strpkg.Store, from *models.Head, callback func(*strpkg.Store, models.LogRequest), ) (InitiatorSubscription, error)
NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.
type JobRunner ¶
type JobRunner interface { Start() error Stop() // contains filtered or unexported methods }
JobRunner safely handles coordinating job runs.
func NewJobRunner ¶
NewJobRunner initializes a JobRunner.
type JobSubscriber ¶
type JobSubscriber interface { store.HeadTrackable AddJob(job models.JobSpec, bn *models.Head) error RemoveJob(ID string) error Jobs() []models.JobSpec }
JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.
func NewJobSubscriber ¶
func NewJobSubscriber(store *store.Store) JobSubscriber
NewJobSubscriber returns a new job subscriber.
type JobSubscription ¶
JobSubscription listens to event logs being pushed from the Ethereum Node to a job.
func StartJobSubscription ¶
func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store) (JobSubscription, error)
StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.
func (JobSubscription) Unsubscribe ¶
func (js JobSubscription) Unsubscribe()
Unsubscribe stops the subscription and cleans up associated resources.
type ManagedSubscription ¶
type ManagedSubscription struct {
// contains filtered or unexported fields
}
ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.
func NewManagedSubscription ¶
func NewManagedSubscription( store *strpkg.Store, filter ethereum.FilterQuery, callback func(models.Log), ) (*ManagedSubscription, error)
NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.
func (ManagedSubscription) Unsubscribe ¶
func (sub ManagedSubscription) Unsubscribe()
Unsubscribe closes channels and cleans up resources.
type OneTime ¶
type OneTime struct { Store *store.Store Clock utils.Afterer // contains filtered or unexported fields }
OneTime represents runs that are to be executed only once.
func (*OneTime) RunJobAt ¶
RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.
type Recurring ¶
Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().
func NewRecurring ¶
NewRecurring create a new instance of Recurring, ready to use.
func (*Recurring) AddJob ¶
AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.
type RecurringScheduleJobError ¶
type RecurringScheduleJobError struct {
// contains filtered or unexported fields
}
RecurringScheduleJobError contains the field for the error message.
func (RecurringScheduleJobError) Error ¶
func (err RecurringScheduleJobError) Error() string
Error returns the error message for the run.
type Scheduler ¶
type Scheduler struct { Recurring *Recurring OneTime *OneTime // contains filtered or unexported fields }
Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.
func NewScheduler ¶
NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.
func (*Scheduler) AddJob ¶
AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.
type SleeperTask ¶
SleeperTask represents a task that waits in the background to process some work.
func NewSleeperTask ¶
func NewSleeperTask(worker Worker) SleeperTask
NewSleeperTask takes a worker and returns a SleeperTask.
SleeperTask is guaranteed to call Work on the worker at least once for every WakeUp call. If the Worker is busy when WakeUp is called, the Worker will be called again immediately after it is finished. For this reason you should take care to make sure that Worker is idempotent. WakeUp does not block.
func NewStoreReaper ¶
func NewStoreReaper(store *store.Store) SleeperTask
NewStoreReaper creates a reaper that cleans stale objects from the store.
type Unsubscriber ¶
type Unsubscriber interface {
Unsubscribe()
}
Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package mock_services is a generated GoMock package.
|
Package mock_services is a generated GoMock package. |