services

package
v0.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2021 License: MIT Imports: 39 Imported by: 4

Documentation

Overview

Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.

Application

The Application is the main component used for starting and stopping the Chainlink node.

JobRunner

The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.

JobSubscriber

The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.

Scheduler

The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExpectedRecurringScheduleJobError added in v0.8.2

func ExpectedRecurringScheduleJobError(err error) bool

func NewExternalInitiatorManager added in v0.9.10

func NewExternalInitiatorManager() *externalInitiatorManager

NewExternalInitiatorManager returns the concrete externalInitiatorManager

func NewPromReporter added in v0.9.6

func NewPromReporter(db *sql.DB, opts ...PrometheusBackend) store.HeadTrackable

func NewRun

func NewRun(
	job *models.JobSpec,
	initiator *models.Initiator,
	currentHeight *big.Int,
	runRequest *models.RunRequest,
	config orm.ConfigReader,
	orm *orm.ORM,
	now time.Time) (*models.JobRun, []*adapters.PipelineAdapter)

NewRun returns a complete run from a JobSpec

func NewStoreReaper

func NewStoreReaper(store *store.Store) utils.SleeperTask

NewStoreReaper creates a reaper that cleans stale objects from the store.

func ReceiveLogRequest

func ReceiveLogRequest(runManager RunManager, le models.LogRequest)

ReceiveLogRequest parses the log and runs the job it indicated by its GetJobSpecID method

func ValidateBridgeType

func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name or invalid url

func ValidateBridgeTypeNotExist added in v0.8.2

func ValidateBridgeTypeNotExist(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeTypeNotExist checks that a bridge has not already been created

func ValidateExternalInitiator added in v0.6.6

func ValidateExternalInitiator(
	exi *models.ExternalInitiatorRequest,
	store *store.Store,
) error

ValidateExternalInitiator checks whether External Initiator parameters are safe for processing.

func ValidateInitiator

func ValidateInitiator(i models.Initiator, j models.JobSpec, store *store.Store) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob

func ValidateJob(j models.JobSpec, store *store.Store) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

func ValidateRun added in v0.8.2

func ValidateRun(run *models.JobRun, contractCost *assets.Link)

ValidateRun ensures that a run's initial preconditions have been met

func ValidateServiceAgreement

func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error

ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.

Types

type BalanceMonitor added in v0.8.11

type BalanceMonitor interface {
	store.HeadTrackable
	GetEthBalance(gethCommon.Address) *assets.Eth
	Stop() error
}

BalanceMonitor checks the balance for each key on every new head

func NewBalanceMonitor added in v0.8.11

func NewBalanceMonitor(store *store.Store) BalanceMonitor

NewBalanceMonitor returns a new balanceMonitor

type Cron

type Cron interface {
	Start()
	Stop() context.Context
	AddFunc(string, func()) (cron.EntryID, error)
}

type HeadBroadcastable added in v0.10.3

type HeadBroadcastable interface {
	OnNewLongestChain(ctx context.Context, head models.Head)
}

HeadBroadcastable defines the interface for listeners

type HeadBroadcaster added in v0.10.3

type HeadBroadcaster struct {
	utils.StartStopOnce
	// contains filtered or unexported fields
}

HeadBroadcaster relays heads from the head tracker to subscribed jobs, it is less robust against congestion than the head tracker, and missed heads should be expected by consuming jobs

func NewHeadBroadcaster added in v0.10.3

func NewHeadBroadcaster() *HeadBroadcaster

NewHeadBroadcaster creates a new HeadBroadcaster

func (*HeadBroadcaster) Close added in v0.10.3

func (hr *HeadBroadcaster) Close() error

func (*HeadBroadcaster) Connect added in v0.10.3

func (hr *HeadBroadcaster) Connect(head *models.Head) error

func (*HeadBroadcaster) Disconnect added in v0.10.3

func (hr *HeadBroadcaster) Disconnect()

func (*HeadBroadcaster) OnNewLongestChain added in v0.10.3

func (hr *HeadBroadcaster) OnNewLongestChain(ctx context.Context, head models.Head)

func (*HeadBroadcaster) Start added in v0.10.3

func (hr *HeadBroadcaster) Start() error

func (*HeadBroadcaster) Subscribe added in v0.10.3

func (hr *HeadBroadcaster) Subscribe(callback HeadBroadcastable) (unsubscribe func())

type HeadTracker

type HeadTracker struct {
	// contains filtered or unexported fields
}

HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.

func NewHeadTracker

func NewHeadTracker(store *strpkg.Store, callbacks []strpkg.HeadTrackable, sleepers ...utils.Sleeper) *HeadTracker

NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.

func (*HeadTracker) Backfill added in v0.10.3

func (ht *HeadTracker) Backfill(ctx context.Context, head models.Head, depth uint) (err error)

Backfill given a head will fill in any missing heads up to the given depth

func (*HeadTracker) Connected

func (ht *HeadTracker) Connected() bool

Connected returns whether or not this HeadTracker is connected.

func (*HeadTracker) HighestSeenHead added in v0.8.5

func (ht *HeadTracker) HighestSeenHead() *models.Head

HighestSeenHead returns the block header with the highest number that has been seen, or nil

func (*HeadTracker) Save

func (ht *HeadTracker) Save(ctx context.Context, h models.Head) error

Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.

func (*HeadTracker) Start

func (ht *HeadTracker) Start() error

Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.

func (*HeadTracker) Stop

func (ht *HeadTracker) Stop() error

Stop unsubscribes all connections and fires Disconnect.

type InitiatorSubscription

type InitiatorSubscription struct {
	*ManagedSubscription

	Initiator models.Initiator
	// contains filtered or unexported fields
}

InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.

func NewInitiatorSubscription

func NewInitiatorSubscription(
	initr models.Initiator,
	client eth.Client,
	runManager RunManager,
	nextHead *big.Int,
	config orm.ConfigReader,
	callback func(RunManager, models.LogRequest),
) (InitiatorSubscription, error)

NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.

type JobSpecNotice added in v0.9.10

type JobSpecNotice struct {
	JobID  models.JobID `json:"jobId"`
	Type   string       `json:"type"`
	Params models.JSON  `json:"params,omitempty"`
}

JobSpecNotice is sent to the External Initiator when JobSpecs are created.

func NewJobSpecNotice added in v0.9.10

func NewJobSpecNotice(initiator models.Initiator, js models.JobSpec) (*JobSpecNotice, error)

NewJobSpecNotice returns a new JobSpec.

type JobSubscriber

type JobSubscriber interface {
	store.HeadTrackable
	AddJob(job models.JobSpec, bn *models.Head) error
	RemoveJob(ID models.JobID) error
	Jobs() []models.JobSpec
	Stop() error
}

JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.

func NewJobSubscriber

func NewJobSubscriber(store *store.Store, runManager RunManager) JobSubscriber

NewJobSubscriber returns a new job subscriber.

type JobSubscription

type JobSubscription struct {
	Job models.JobSpec
	// contains filtered or unexported fields
}

JobSubscription listens to event logs being pushed from the Ethereum Node to a job.

func StartJobSubscription

func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store, runManager RunManager) (JobSubscription, error)

StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.

func (JobSubscription) Unsubscribe

func (js JobSubscription) Unsubscribe()

Unsubscribe stops the subscription and cleans up associated resources.

type ManagedSubscription

type ManagedSubscription struct {
	// contains filtered or unexported fields
}

ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.

func NewManagedSubscription

func NewManagedSubscription(logSubscriber eth.Client, filter ethereum.FilterQuery, callback func(models.Log), backfillBatchSize uint32) (*ManagedSubscription, error)

NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.

func (ManagedSubscription) Unsubscribe

func (sub ManagedSubscription) Unsubscribe()

Unsubscribe closes channels and cleans up resources.

type NullBalanceMonitor added in v0.9.3

type NullBalanceMonitor struct{}

func (*NullBalanceMonitor) Connect added in v0.9.3

func (*NullBalanceMonitor) Connect(head *models.Head) error

func (*NullBalanceMonitor) Disconnect added in v0.9.3

func (*NullBalanceMonitor) Disconnect()

func (*NullBalanceMonitor) GetEthBalance added in v0.9.3

func (*NullBalanceMonitor) GetEthBalance(gethCommon.Address) *assets.Eth

func (*NullBalanceMonitor) OnNewLongestChain added in v0.9.3

func (*NullBalanceMonitor) OnNewLongestChain(ctx context.Context, head models.Head)

func (*NullBalanceMonitor) Stop added in v0.9.3

func (*NullBalanceMonitor) Stop() error

type NullExternalInitiatorManager added in v0.9.10

type NullExternalInitiatorManager struct{}

func (NullExternalInitiatorManager) DeleteJob added in v0.9.10

func (NullExternalInitiatorManager) DeleteJob(db *gorm.DB, jobID models.JobID) error

func (NullExternalInitiatorManager) Notify added in v0.9.10

type OneTime

type OneTime struct {
	Store      *store.Store
	Clock      utils.Afterer
	RunManager RunManager
	// contains filtered or unexported fields
}

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt

func (ot *OneTime) RunJobAt(initiator models.Initiator, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type PrometheusBackend added in v0.9.6

type PrometheusBackend interface {
	SetUnconfirmedTransactions(int64)
	SetMaxUnconfirmedBlocks(int64)
	SetPipelineRunsQueued(n int)
	SetPipelineTaskRunsQueued(n int)
}

type Recurring

type Recurring struct {
	Cron  Cron
	Clock utils.Nower
	// contains filtered or unexported fields
}

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring

func NewRecurring(runManager RunManager) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type RecurringScheduleJobError

type RecurringScheduleJobError struct {
	// contains filtered or unexported fields
}

RecurringScheduleJobError contains the field for the error message.

func (RecurringScheduleJobError) Error

func (err RecurringScheduleJobError) Error() string

Error returns the error message for the run.

type RunExecutor added in v0.8.2

type RunExecutor interface {
	Execute(uuid.UUID) error
}

RunExecutor handles the actual running of the job tasks

func NewRunExecutor added in v0.8.2

func NewRunExecutor(store *store.Store, statsPusher synchronization.StatsPusher) RunExecutor

NewRunExecutor initializes a RunExecutor.

type RunManager added in v0.8.2

type RunManager interface {
	Create(
		jobSpecID models.JobID,
		initiator *models.Initiator,
		creationHeight *big.Int,
		runRequest *models.RunRequest) (*models.JobRun, error)
	CreateErrored(
		jobSpecID models.JobID,
		initiator models.Initiator,
		err error) (*models.JobRun, error)
	ResumePendingBridge(
		runID uuid.UUID,
		input models.BridgeRunResult) error
	Cancel(runID uuid.UUID) (*models.JobRun, error)

	ResumeAllInProgress() error
	ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error
	ResumeAllPendingConnection() error
}

RunManager supplies methods for queueing, resuming and cancelling jobs in the RunQueue

func NewRunManager added in v0.8.2

func NewRunManager(
	runQueue RunQueue,
	config orm.ConfigReader,
	orm *orm.ORM,
	statsPusher synchronization.StatsPusher,
	clock utils.AfterNower) RunManager

NewRunManager returns a new job manager

type RunQueue added in v0.8.2

type RunQueue interface {
	Start() error
	Stop()
	Run(uuid.UUID)

	WorkerCount() int
}

RunQueue safely handles coordinating job runs.

func NewRunQueue added in v0.8.2

func NewRunQueue(runExecutor RunExecutor) RunQueue

NewRunQueue initializes a RunQueue.

type Scheduler

type Scheduler struct {
	Recurring *Recurring
	OneTime   *OneTime
	// contains filtered or unexported fields
}

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler

func NewScheduler(store *store.Store, runManager RunManager) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

type Unsubscriber

type Unsubscriber interface {
	Unsubscribe()
}

Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.

Directories

Path Synopsis
job
log
signatures
cryptotest
package cryptotest provides convenience functions for kyber-based APIs.
package cryptotest provides convenience functions for kyber-based APIs.
ethdss
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
ethschnorr
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
secp256k1
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Numbers are deterministically generated from seeds and a secret key, and are statistically indistinguishable from uniform sampling from {0,...,2**256-1}, to computationally-bounded observers who know the seeds, don't know the key, and only see the generated numbers.
Numbers are deterministically generated from seeds and a secret key, and are statistically indistinguishable from uniform sampling from {0,...,2**256-1}, to computationally-bounded observers who know the seeds, don't know the key, and only see the generated numbers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL