services

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2019 License: MIT Imports: 22 Imported by: 4

Documentation

Overview

Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.

Application

The Application is the main component used for starting and stopping the Chainlink node.

JobRunner

The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.

JobSubscriber

The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.

Scheduler

The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteJob

func ExecuteJob(
	job models.JobSpec,
	initiator models.Initiator,
	input models.RunResult,
	creationHeight *big.Int,
	store *store.Store) (*models.JobRun, error)

ExecuteJob saves and immediately begins executing a run for a specified job if it is ready.

func ExecuteJobWithRunRequest

func ExecuteJobWithRunRequest(
	job models.JobSpec,
	initiator models.Initiator,
	input models.RunResult,
	creationHeight *big.Int,
	store *store.Store,
	runRequest models.RunRequest) (*models.JobRun, error)

ExecuteJobWithRunRequest saves and immediately begins executing a run for a specified job if it is ready, assigning the passed initiator run.

func NewRun

func NewRun(
	job models.JobSpec,
	initiator models.Initiator,
	input models.RunResult,
	currentHeight *big.Int,
	store *store.Store) (*models.JobRun, error)

NewRun returns a run from an input job, in an initial state ready for processing by the job runner system

func QueueSleepingTask

func QueueSleepingTask(
	run *models.JobRun,
	store *store.Store,
) error

QueueSleepingTask creates a go routine which will wake up the job runner once the sleep's time has elapsed

func ReceiveLogRequest

func ReceiveLogRequest(store *strpkg.Store, le models.LogRequest)

ReceiveLogRequest parses the log and runs the job indicated by a RunLog or ServiceAgreementExecutionLog. (Both log events have the same format.)

func ResumeConfirmingTask

func ResumeConfirmingTask(
	run *models.JobRun,
	store *store.Store,
	currentBlockHeight *big.Int,
) error

ResumeConfirmingTask resumes a confirming run if the minimum confirmations have been met

func ResumeConnectingTask

func ResumeConnectingTask(
	run *models.JobRun,
	store *store.Store,
) error

ResumeConnectingTask resumes a run that was left in pending_connection.

func ResumePendingTask

func ResumePendingTask(
	run *models.JobRun,
	store *store.Store,
	input models.RunResult,
) error

ResumePendingTask takes the body provided from an external adapter, saves it for the next task to process, then tells the job runner to execute it

func ValidateBridgeType

func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name

func ValidateInitiator

func ValidateInitiator(i models.Initiator, j models.JobSpec) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob

func ValidateJob(j models.JobSpec, store *store.Store) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

func ValidateServiceAgreement

func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error

ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.

Types

type Application

type Application interface {
	Start() error
	Stop() error
	GetStore() *store.Store
	WakeSessionReaper()
	AddJob(job models.JobSpec) error
	ArchiveJob(ID string) error
	AddServiceAgreement(*models.ServiceAgreement) error
	NewBox() packr.Box
}

Application implements the common functions used in the core node.

func NewApplication

func NewApplication(config store.Config, onConnectCallbacks ...func(Application)) Application

NewApplication initializes a new store if one is not already present at the configured root directory (default: ~/.chainlink), the logger at the same directory and returns the Application to be used by the node.

type ChainlinkApplication

type ChainlinkApplication struct {
	Exiter        func(int)
	HeadTracker   *HeadTracker
	JobRunner     JobRunner
	JobSubscriber JobSubscriber
	Scheduler     *Scheduler
	Store         *store.Store
	SessionReaper SleeperTask
	// contains filtered or unexported fields
}

ChainlinkApplication contains fields for the JobSubscriber, Scheduler, and Store. The JobSubscriber and Scheduler are also available in the services package, but the Store has its own package.

func (*ChainlinkApplication) AddJob

func (app *ChainlinkApplication) AddJob(job models.JobSpec) error

AddJob adds a job to the store and the scheduler. If there was an error from adding the job to the store, the job will not be added to the scheduler.

func (*ChainlinkApplication) AddServiceAgreement

func (app *ChainlinkApplication) AddServiceAgreement(sa *models.ServiceAgreement) error

AddServiceAgreement adds a Service Agreement which includes a job that needs to be scheduled.

func (*ChainlinkApplication) ArchiveJob

func (app *ChainlinkApplication) ArchiveJob(ID string) error

ArchiveJob silences the job from the system, preventing future job runs.

func (*ChainlinkApplication) GetStore

func (app *ChainlinkApplication) GetStore() *store.Store

GetStore returns the pointer to the store for the ChainlinkApplication.

func (*ChainlinkApplication) NewBox

func (app *ChainlinkApplication) NewBox() packr.Box

NewBox returns the packr.Box instance that holds the static assets to be delivered by the router.

func (*ChainlinkApplication) Start

func (app *ChainlinkApplication) Start() error

Start runs the JobSubscriber and Scheduler. If successful, nil will be returned. Also listens for interrupt signals from the operating system so that the application can be properly closed before the application exits.

func (*ChainlinkApplication) Stop

func (app *ChainlinkApplication) Stop() error

Stop allows the application to exit by halting schedules, closing logs, and closing the DB connection.

func (*ChainlinkApplication) WakeSessionReaper

func (app *ChainlinkApplication) WakeSessionReaper()

WakeSessionReaper wakes up the reaper to do its reaping.

type Cron

type Cron interface {
	Start()
	Stop()
	AddFunc(string, func()) error
}

Cron is an interface for scheduling recurring functions to run. Cron's schedule format is similar to the standard cron format but with an extra field at the beginning for seconds.

type HeadTracker

type HeadTracker struct {
	// contains filtered or unexported fields
}

HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.

func NewHeadTracker

func NewHeadTracker(store *strpkg.Store, callbacks []strpkg.HeadTrackable, sleepers ...utils.Sleeper) *HeadTracker

NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.

func (*HeadTracker) Connected

func (ht *HeadTracker) Connected() bool

Connected returns whether or not this HeadTracker is connected.

func (*HeadTracker) Head

func (ht *HeadTracker) Head() *models.Head

Head returns the latest block header being tracked, or nil.

func (*HeadTracker) Save

func (ht *HeadTracker) Save(n *models.Head) error

Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.

func (*HeadTracker) Start

func (ht *HeadTracker) Start() error

Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.

func (*HeadTracker) Stop

func (ht *HeadTracker) Stop() error

Stop unsubscribes all connections and fires Disconnect.

type InitiatorSubscription

type InitiatorSubscription struct {
	*ManagedSubscription
	Job       models.JobSpec
	Initiator models.Initiator
	// contains filtered or unexported fields
}

InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.

func NewInitiatorSubscription

func NewInitiatorSubscription(
	initr models.Initiator,
	job models.JobSpec,
	store *strpkg.Store,
	from *models.Head,
	callback func(*strpkg.Store, models.LogRequest),
) (InitiatorSubscription, error)

NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.

type JobRunner

type JobRunner interface {
	Start() error
	Stop()
	// contains filtered or unexported methods
}

JobRunner safely handles coordinating job runs.

func NewJobRunner

func NewJobRunner(str *store.Store) JobRunner

NewJobRunner initializes a JobRunner.

type JobSubscriber

type JobSubscriber interface {
	store.HeadTrackable
	AddJob(job models.JobSpec, bn *models.Head) error
	RemoveJob(ID string) error
	Jobs() []models.JobSpec
}

JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.

func NewJobSubscriber

func NewJobSubscriber(store *store.Store) JobSubscriber

NewJobSubscriber returns a new job subscriber.

type JobSubscription

type JobSubscription struct {
	Job models.JobSpec
	// contains filtered or unexported fields
}

JobSubscription listens to event logs being pushed from the Ethereum Node to a job.

func StartJobSubscription

func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store) (JobSubscription, error)

StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.

func (JobSubscription) Unsubscribe

func (js JobSubscription) Unsubscribe()

Unsubscribe stops the subscription and cleans up associated resources.

type ManagedSubscription

type ManagedSubscription struct {
	// contains filtered or unexported fields
}

ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.

func NewManagedSubscription

func NewManagedSubscription(
	store *strpkg.Store,
	filter ethereum.FilterQuery,
	callback func(models.Log),
) (*ManagedSubscription, error)

NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.

func (ManagedSubscription) Unsubscribe

func (sub ManagedSubscription) Unsubscribe()

Unsubscribe closes channels and cleans up resources.

type OneTime

type OneTime struct {
	Store *store.Store
	Clock utils.Afterer
	// contains filtered or unexported fields
}

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt

func (ot *OneTime) RunJobAt(initr models.Initiator, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type Recurring

type Recurring struct {
	Cron  Cron
	Clock utils.Nower
	// contains filtered or unexported fields
}

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring

func NewRecurring(store *store.Store) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type RecurringScheduleJobError

type RecurringScheduleJobError struct {
	// contains filtered or unexported fields
}

RecurringScheduleJobError contains the field for the error message.

func (RecurringScheduleJobError) Error

func (err RecurringScheduleJobError) Error() string

Error returns the error message for the run.

type Scheduler

type Scheduler struct {
	Recurring *Recurring
	OneTime   *OneTime
	// contains filtered or unexported fields
}

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler

func NewScheduler(store *store.Store) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

type SleeperTask

type SleeperTask interface {
	Start() error
	Stop() error
	WakeUp()
}

SleeperTask represents a task that waits in the background to process some work.

func NewSleeperTask

func NewSleeperTask(worker Worker) SleeperTask

NewSleeperTask takes a worker and returns a SleeperTask.

SleeperTask is guaranteed to call Work on the worker at least once for every WakeUp call. If the Worker is busy when WakeUp is called, the Worker will be called again immediately after it is finished. For this reason you should take care to make sure that Worker is idempotent. WakeUp does not block.

func NewStoreReaper

func NewStoreReaper(store *store.Store) SleeperTask

NewStoreReaper creates a reaper that cleans stale objects from the store.

type Unsubscriber

type Unsubscriber interface {
	Unsubscribe()
}

Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.

type Worker

type Worker interface {
	Work()
}

Worker is a simple interface that represents some work to do repeatedly

Directories

Path Synopsis
Package mock_services is a generated GoMock package.
Package mock_services is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL