services

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2021 License: MIT Imports: 38 Imported by: 0

Documentation

Overview

Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.

Application

The Application is the main component used for starting and stopping the Chainlink node.

JobRunner

The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.

JobSubscriber

The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.

Scheduler

The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExpectedRecurringScheduleJobError

func ExpectedRecurringScheduleJobError(err error) bool

func NewPromReporter

func NewPromReporter(db *sql.DB, opts ...PrometheusBackend) *promReporter

func NewRun

func NewRun(
	job *models.JobSpec,
	initiator *models.Initiator,
	currentHeight *big.Int,
	runRequest *models.RunRequest,
	config orm.ConfigReader,
	orm *orm.ORM,
	now time.Time) (*models.JobRun, []*adapters.PipelineAdapter)

NewRun returns a complete run from a JobSpec

func NewSessionReaper

func NewSessionReaper(store *store.Store) utils.SleeperTask

NewSessionReaper creates a reaper that cleans stale sessions from the store.

func ProcessLogRequest

func ProcessLogRequest(runManager RunManager, le models.LogRequest)

ReceiveLogRequest parses the log and runs the job it indicated by its GetJobSpecID method

func ValidateBridgeType

func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name or invalid url

func ValidateBridgeTypeNotExist

func ValidateBridgeTypeNotExist(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeTypeNotExist checks that a bridge has not already been created

func ValidateExternalInitiator

func ValidateExternalInitiator(
	exi *models.ExternalInitiatorRequest,
	store *store.Store,
) error

ValidateExternalInitiator checks whether External Initiator parameters are safe for processing.

func ValidateInitiator

func ValidateInitiator(i models.Initiator, j models.JobSpec, store *store.Store) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob

func ValidateJob(j models.JobSpec, store *store.Store, keyStore *keystore.Master) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

func ValidateRun

func ValidateRun(run *models.JobRun, contractCost *assets.Link)

ValidateRun ensures that a run's initial preconditions have been met

func ValidateServiceAgreement

func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store, keyStore *keystore.Master) error

ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.

Types

type BalanceMonitor

type BalanceMonitor interface {
	httypes.HeadTrackable
	GetEthBalance(gethCommon.Address) *assets.Eth
	service.Service
}

BalanceMonitor checks the balance for each key on every new head

func NewBalanceMonitor

func NewBalanceMonitor(store *store.Store, ethKeyStore *keystore.Eth) BalanceMonitor

NewBalanceMonitor returns a new balanceMonitor

type Cron

type Cron interface {
	Start()
	Stop() context.Context
	AddFunc(string, func()) (cron.EntryID, error)
}

type InitiatorSubscription

type InitiatorSubscription struct {
	Initiator models.Initiator
	// contains filtered or unexported fields
}

InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.

func NewInitiatorSubscription

func NewInitiatorSubscription(
	initr models.Initiator,
	client eth.Client,
	runManager RunManager,
	filter ethereum.FilterQuery,
	backfillBatchSize uint32,
	callback func(RunManager, models.LogRequest),
) (*InitiatorSubscription, error)

NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.

func (*InitiatorSubscription) Start

func (sub *InitiatorSubscription) Start()

func (*InitiatorSubscription) Unsubscribe

func (sub *InitiatorSubscription) Unsubscribe()

Unsubscribe closes channels and cleans up resources.

type JobSubscriber

type JobSubscriber interface {
	httypes.HeadTrackable
	AddJob(job models.JobSpec, bn *models.Head) error
	RemoveJob(ID models.JobID) error
	Jobs() []models.JobSpec
	service.Service
}

JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.

func NewJobSubscriber

func NewJobSubscriber(store *store.Store, runManager RunManager) JobSubscriber

NewJobSubscriber returns a new job subscriber.

type JobSubscription

type JobSubscription struct {
	Job models.JobSpec
	// contains filtered or unexported fields
}

JobSubscription listens to event logs being pushed from the Ethereum Node to a job.

func StartJobSubscription

func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store, runManager RunManager) (JobSubscription, error)

StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.

func (JobSubscription) Unsubscribe

func (js JobSubscription) Unsubscribe()

Unsubscribe stops the subscription and cleans up associated resources.

type NullBalanceMonitor

type NullBalanceMonitor struct{}

func (*NullBalanceMonitor) Close

func (*NullBalanceMonitor) Close() error

func (*NullBalanceMonitor) Connect

func (*NullBalanceMonitor) Connect(head *models.Head) error

func (*NullBalanceMonitor) Disconnect

func (*NullBalanceMonitor) Disconnect()

func (*NullBalanceMonitor) GetEthBalance

func (*NullBalanceMonitor) GetEthBalance(gethCommon.Address) *assets.Eth

func (*NullBalanceMonitor) Healthy

func (*NullBalanceMonitor) Healthy() error

func (*NullBalanceMonitor) OnNewLongestChain

func (*NullBalanceMonitor) OnNewLongestChain(ctx context.Context, head models.Head)

func (*NullBalanceMonitor) Ready

func (*NullBalanceMonitor) Ready() error

func (*NullBalanceMonitor) Start

func (*NullBalanceMonitor) Start() error

type NullJobSubscriber

type NullJobSubscriber struct{}

NullJobSubscriber implements Null pattern for JobSubscriber interface

func (NullJobSubscriber) AddJob

func (NullJobSubscriber) AddJob(job models.JobSpec, bn *models.Head) error

func (NullJobSubscriber) Close

func (NullJobSubscriber) Close() error

func (NullJobSubscriber) Connect

func (NullJobSubscriber) Connect(head *models.Head) error

func (NullJobSubscriber) Healthy

func (NullJobSubscriber) Healthy() error

func (NullJobSubscriber) Jobs

func (NullJobSubscriber) Jobs() (j []models.JobSpec)

func (NullJobSubscriber) OnNewLongestChain

func (NullJobSubscriber) OnNewLongestChain(ctx context.Context, head models.Head)

func (NullJobSubscriber) Ready

func (NullJobSubscriber) Ready() error

func (NullJobSubscriber) RemoveJob

func (NullJobSubscriber) RemoveJob(ID models.JobID) error

func (NullJobSubscriber) Start

func (NullJobSubscriber) Start() error

type NullRunExecutor

type NullRunExecutor struct{}

NullRunExecutor implements Null pattern for RunExecutor interface

func (NullRunExecutor) Execute

func (NullRunExecutor) Execute(uuid.UUID) error

type NullRunManager

type NullRunManager struct{}

NullRunManager implements Null pattern for RunManager interface

func (NullRunManager) Cancel

func (NullRunManager) Cancel(runID uuid.UUID) (*models.JobRun, error)

func (NullRunManager) Create

func (NullRunManager) Create(jobSpecID models.JobID, initiator *models.Initiator, creationHeight *big.Int, runRequest *models.RunRequest) (*models.JobRun, error)

func (NullRunManager) CreateErrored

func (NullRunManager) CreateErrored(jobSpecID models.JobID, initiator models.Initiator, err error) (*models.JobRun, error)

func (NullRunManager) ResumeAllInProgress

func (NullRunManager) ResumeAllInProgress() error

func (NullRunManager) ResumeAllPendingConnection

func (NullRunManager) ResumeAllPendingConnection() error

func (NullRunManager) ResumeAllPendingNextBlock

func (NullRunManager) ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error

func (NullRunManager) ResumePendingBridge

func (NullRunManager) ResumePendingBridge(runID uuid.UUID, input models.BridgeRunResult) error

type NullRunQueue

type NullRunQueue struct{}

NullRunQueue implements Null pattern for RunQueue interface

func (NullRunQueue) Close

func (NullRunQueue) Close() error

func (NullRunQueue) Healthy

func (NullRunQueue) Healthy() error

func (NullRunQueue) Ready

func (NullRunQueue) Ready() error

func (NullRunQueue) Run

func (NullRunQueue) Run(uuid.UUID)

func (NullRunQueue) Start

func (NullRunQueue) Start() error

func (NullRunQueue) WorkerCount

func (NullRunQueue) WorkerCount() int

type OneTime

type OneTime struct {
	Store      *store.Store
	Clock      utils.Afterer
	RunManager RunManager
	// contains filtered or unexported fields
}

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt

func (ot *OneTime) RunJobAt(initiator models.Initiator, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type PrometheusBackend

type PrometheusBackend interface {
	SetUnconfirmedTransactions(int64)
	SetMaxUnconfirmedAge(float64)
	SetMaxUnconfirmedBlocks(int64)
	SetPipelineRunsQueued(n int)
	SetPipelineTaskRunsQueued(n int)
}

type Recurring

type Recurring struct {
	Cron  Cron
	Clock utils.Nower
	// contains filtered or unexported fields
}

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring

func NewRecurring(runManager RunManager) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type RecurringScheduleJobError

type RecurringScheduleJobError struct {
	// contains filtered or unexported fields
}

RecurringScheduleJobError contains the field for the error message.

func (RecurringScheduleJobError) Error

func (err RecurringScheduleJobError) Error() string

Error returns the error message for the run.

type RunExecutor

type RunExecutor interface {
	Execute(uuid.UUID) error
}

RunExecutor handles the actual running of the job tasks

func NewRunExecutor

func NewRunExecutor(store *store.Store, keyStore *keystore.Master, statsPusher synchronization.StatsPusher) RunExecutor

NewRunExecutor initializes a RunExecutor.

type RunManager

type RunManager interface {
	Create(
		jobSpecID models.JobID,
		initiator *models.Initiator,
		creationHeight *big.Int,
		runRequest *models.RunRequest) (*models.JobRun, error)
	CreateErrored(
		jobSpecID models.JobID,
		initiator models.Initiator,
		err error) (*models.JobRun, error)
	ResumePendingBridge(
		runID uuid.UUID,
		input models.BridgeRunResult) error
	Cancel(runID uuid.UUID) (*models.JobRun, error)

	ResumeAllInProgress() error
	ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error
	ResumeAllPendingConnection() error
}

RunManager supplies methods for queueing, resuming and cancelling jobs in the RunQueue

func NewRunManager

func NewRunManager(
	runQueue RunQueue,
	config orm.ConfigReader,
	orm *orm.ORM,
	statsPusher synchronization.StatsPusher,
	clock utils.AfterNower) RunManager

NewRunManager returns a new job manager

type RunQueue

type RunQueue interface {
	service.Service

	Run(uuid.UUID)

	WorkerCount() int
}

RunQueue safely handles coordinating job runs.

func NewRunQueue

func NewRunQueue(runExecutor RunExecutor) RunQueue

NewRunQueue initializes a RunQueue.

type Scheduler

type Scheduler struct {
	Recurring *Recurring
	OneTime   *OneTime

	utils.StartStopOnce
	// contains filtered or unexported fields
}

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler

func NewScheduler(store *store.Store, runManager RunManager) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has started.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

Directories

Path Synopsis
eth
job
keys/vrfkey
Package vrfkey tracks the secret keys associated with VRF proofs.
Package vrfkey tracks the secret keys associated with VRF proofs.
log
signatures
cryptotest
Package cryptotest provides convenience functions for kyber-based APIs.
Package cryptotest provides convenience functions for kyber-based APIs.
ethdss
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
ethschnorr
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
secp256k1
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
vrf

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL