services

package
v0.0.0-...-537fcec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2018 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.

Application

The Application is the main component used for starting and stopping the Chainlink node.

JobRunner

The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.

EthereumListener

The EthereumListener coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.

Scheduler

The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.

Index

Constants

View Source
const (
	EventTopicSignature = iota
	EventTopicRequestID
	EventTopicJobID
)

Descriptive indices of a RunLog's Topic array

Variables

View Source
var RunLogTopic = common.HexToHash("0x06f4bf36b4e011a5c499cef1113c2d166800ce4013f6c2509cab1a0e92b83fb2")

RunLogTopic is the signature for the Request(uint256,bytes32,string) event which Chainlink RunLog initiators watch for. See https://github.com/smartcontractkit/chainlink/blob/master/solidity/contracts/Oracle.sol

Functions

func BeginRun

func BeginRun(job models.JobSpec, store *store.Store, input models.RunResult) (models.JobRun, error)

BeginRun creates a new run if the job is valid and starts the job.

func BuildRun

func BuildRun(job models.JobSpec, store *store.Store) (models.JobRun, error)

BuildRun checks to ensure the given job has not started or ended before creating a new run for the job.

func ExecuteRun

func ExecuteRun(run models.JobRun, store *store.Store, input models.RunResult) (models.JobRun, error)

ExecuteRun starts the job and executes task runs within that job in the order defined in the run for as long as they do not return errors. Results are saved in the store (db).

func ReceiveEthLog

func ReceiveEthLog(le RPCLogEvent)

Parse the log and run the job specific to this initiator log event.

func ReceiveRunLog

func ReceiveRunLog(le RPCLogEvent)

Parse the log and run the job specific to this initiator log event.

func ValidateInitiator

func ValidateInitiator(i models.Initiator, j models.JobSpec) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob

func ValidateJob(j models.JobSpec, store *store.Store) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

Types

type Afterer

type Afterer interface {
	After(d time.Duration) <-chan time.Time
}

Afterer is an interface that fulfills the After method, following the behavior of time.After.

type Application

type Application interface {
	Start() error
	Stop() error
	GetStore() *store.Store
}

Application implements the common functions used in the core node.

func NewApplication

func NewApplication(config store.Config) Application

NewApplication initializes a new store if one is not already present at the configured root directory (default: ~/.chainlink), the logger at the same directory and returns the Application to be used by the node.

type ChainlinkApplication

type ChainlinkApplication struct {
	HeadTracker      *HeadTracker
	EthereumListener *EthereumListener
	Scheduler        *Scheduler
	Store            *store.Store
}

ChainlinkApplication contains fields for the EthereumListener, Scheduler, and Store. The EthereumListener and Scheduler are also available in the services package, but the Store has its own package.

func (*ChainlinkApplication) AddJob

func (app *ChainlinkApplication) AddJob(job models.JobSpec) error

AddJob adds a job to the store and the scheduler. If there was an error from adding the job to the store, the job will not be added to the scheduler.

func (*ChainlinkApplication) GetStore

func (app *ChainlinkApplication) GetStore() *store.Store

GetStore returns the pointer to the store for the ChainlinkApplication.

func (*ChainlinkApplication) Start

func (app *ChainlinkApplication) Start() error

Start runs the Store, EthereumListener, and Scheduler. If successful, nil will be returned.

func (*ChainlinkApplication) Stop

func (app *ChainlinkApplication) Stop() error

Stop allows the application to exit by halting schedules, closing logs, and closing the DB connection.

type Cron

type Cron interface {
	Start()
	Stop()
	AddFunc(string, func()) error
}

Cron is an interface for scheduling recurring functions to run. Cron's schedule format is similar to the standard cron format but with an extra field at the beginning for seconds.

type EthereumListener

type EthereumListener struct {
	Store       *store.Store
	HeadTracker *HeadTracker
	// contains filtered or unexported fields
}

EthereumListener manages push notifications from the ethereum node's websocket to listen for new heads and log events.

func (*EthereumListener) AddJob

func (el *EthereumListener) AddJob(job models.JobSpec) error

AddJob looks for "runlog" and "ethlog" Initiators for a given job and watches the Ethereum blockchain for the addresses in the job.

func (*EthereumListener) Connect

func (el *EthereumListener) Connect() error

func (*EthereumListener) Disconnect

func (el *EthereumListener) Disconnect()

func (*EthereumListener) Jobs

func (el *EthereumListener) Jobs() []models.JobSpec

func (*EthereumListener) OnNewHead

func (el *EthereumListener) OnNewHead(_ *models.BlockHeader)

func (*EthereumListener) Start

func (el *EthereumListener) Start() error

Start obtains the jobs from the store and subscribes to logs and newHeads in order to start and resume jobs waiting on events or confirmations.

func (*EthereumListener) Stop

func (el *EthereumListener) Stop() error

Stop gracefully closes its access to the store's EthNotifications and resets resources.

type HeadTrackable

type HeadTrackable interface {
	Connect() error
	Disconnect()
	OnNewHead(*models.BlockHeader)
}

type HeadTracker

type HeadTracker struct {
	// contains filtered or unexported fields
}

Holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.

func NewHeadTracker

func NewHeadTracker(store *store.Store, sleepers ...utils.Sleeper) *HeadTracker

Instantiates a new HeadTracker using the orm to persist new block numbers

func (*HeadTracker) Attach

func (ht *HeadTracker) Attach(t HeadTrackable) string

func (*HeadTracker) Connect

func (ht *HeadTracker) Connect()

func (*HeadTracker) Detach

func (ht *HeadTracker) Detach(id string)

func (*HeadTracker) Disconnect

func (ht *HeadTracker) Disconnect()

func (*HeadTracker) Get

Returns the latest block header being tracked, or nil.

func (*HeadTracker) IsConnected

func (ht *HeadTracker) IsConnected() bool

func (*HeadTracker) OnNewHead

func (ht *HeadTracker) OnNewHead(head *models.BlockHeader)

func (*HeadTracker) Save

Updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.

func (*HeadTracker) Start

func (ht *HeadTracker) Start() error

func (*HeadTracker) Stop

func (ht *HeadTracker) Stop() error

type JobRunnerError

type JobRunnerError struct {
	// contains filtered or unexported fields
}

JobRunnerError contains the field for the error message.

func (JobRunnerError) Error

func (err JobRunnerError) Error() string

Error returns the error message for the run.

type JobSubscription

type JobSubscription struct {
	Job models.JobSpec
	// contains filtered or unexported fields
}

Listens to event logs being pushed from the Ethereum Node specific to a job.

func StartJobSubscription

func StartJobSubscription(job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (JobSubscription, error)

Constructor of JobSubscription that to starts listening to and keeps track of event logs corresponding to a job.

func (JobSubscription) Unsubscribe

func (js JobSubscription) Unsubscribe()

Stops the subscription and cleans up associated resources.

type NoOpHeadTrackable

type NoOpHeadTrackable struct{}

func (NoOpHeadTrackable) Connect

func (NoOpHeadTrackable) Connect() error

func (NoOpHeadTrackable) Disconnect

func (NoOpHeadTrackable) Disconnect()

func (NoOpHeadTrackable) OnNewHead

func (NoOpHeadTrackable) OnNewHead(*models.BlockHeader)

type Nower

type Nower interface {
	Now() time.Time
}

Nower is an interface that fulfills the Now method, following the behavior of time.Now.

type OneTime

type OneTime struct {
	Store *store.Store
	Clock Afterer
	// contains filtered or unexported fields
}

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt

func (ot *OneTime) RunJobAt(t models.Time, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type RPCLogEvent

type RPCLogEvent struct {
	Log       types.Log
	Job       models.JobSpec
	Initiator models.Initiator
	// contains filtered or unexported fields
}

Encapsulates all information as a result of a received log from an RPCLogSubscription.

func (RPCLogEvent) EthLogJSON

func (le RPCLogEvent) EthLogJSON() (models.JSON, error)

Reformat the log as JSON.

func (RPCLogEvent) ForLogger

func (le RPCLogEvent) ForLogger(kvs ...interface{}) []interface{}

ForLogger formats the RPCLogEvent for easy common formatting in logs (trace statements, not ethereum events).

func (RPCLogEvent) RunLogJSON

func (le RPCLogEvent) RunLogJSON() (models.JSON, error)

Extract data from the log's topics and data specific to the format defined by RunLogs.

func (RPCLogEvent) ValidateRunLog

func (le RPCLogEvent) ValidateRunLog() bool

Return whether or not the contained log is a RunLog, a specific Chainlink event trigger from smart contracts.

type RPCLogSubscription

type RPCLogSubscription struct {
	Job        models.JobSpec
	Initiator  models.Initiator
	ReceiveLog func(RPCLogEvent)
	// contains filtered or unexported fields
}

Encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the ReceiveLog callback using a strategy pattern.

func NewRPCLogSubscription

func NewRPCLogSubscription(
	initr models.Initiator,
	job models.JobSpec,
	head *models.IndexableBlockNumber,
	store *store.Store,
	callback func(RPCLogEvent),
) (RPCLogSubscription, error)

Create a new RPCLogSubscription that feeds received logs to the callback func parameter.

func (RPCLogSubscription) Unsubscribe

func (sub RPCLogSubscription) Unsubscribe()

Close channels and clean up resources.

type Recurring

type Recurring struct {
	Cron  Cron
	Clock Nower
	// contains filtered or unexported fields
}

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring

func NewRecurring(store *store.Store) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type Scheduler

type Scheduler struct {
	Recurring *Recurring
	OneTime   *OneTime
	// contains filtered or unexported fields
}

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler

func NewScheduler(store *store.Store) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

type Unsubscriber

type Unsubscriber interface {
	Unsubscribe()
}

Interface for all subscriptions made specific to a subscription.

func StartEthLogSubscription

func StartEthLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)

Starts an RPCLogSubscription tailored for use with EthLogs.

func StartRunLogSubscription

func StartRunLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)

Starts an RPCLogSubscription tailored for use with RunLogs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL