Documentation ¶
Overview ¶
Package services contain the key components of the Adamoracle node. This includes the Application, JobRunner, LogListener, and Scheduler.
Application ¶
The Application is the main component used for starting and stopping the Adamoracle node.
JobRunner ¶
The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.
JobSubscriber ¶
The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.
Scheduler ¶
The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.
Index ¶
- func ExpectedRecurringScheduleJobError(err error) bool
- func NewExternalInitiatorManager() *externalInitiatorManager
- func NewPromReporter(db *sql.DB, opts ...PrometheusBackend) store.HeadTrackable
- func NewRun(job *models.JobSpec, initiator *models.Initiator, currentHeight *big.Int, ...) (*models.JobRun, []*adapters.PipelineAdapter)
- func NewStoreReaper(store *store.Store) utils.SleeperTask
- func ReceiveLogRequest(runManager RunManager, le models.LogRequest)
- func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error
- func ValidateBridgeTypeNotExist(bt *models.BridgeTypeRequest, store *store.Store) error
- func ValidateExternalInitiator(exi *models.ExternalInitiatorRequest, store *store.Store) error
- func ValidateInitiator(i models.Initiator, j models.JobSpec, store *store.Store) error
- func ValidateJob(j models.JobSpec, store *store.Store) error
- func ValidateRun(run *models.JobRun, contractCost *assets.Adam)
- func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error
- type BalanceMonitor
- type Cron
- type HeadBroadcastable
- type HeadBroadcaster
- func (hr *HeadBroadcaster) Close() error
- func (hr *HeadBroadcaster) Connect(head *models.Head) error
- func (hr *HeadBroadcaster) Disconnect()
- func (hr *HeadBroadcaster) OnNewLongestChain(ctx context.Context, head models.Head)
- func (hr *HeadBroadcaster) Start() error
- func (hr *HeadBroadcaster) Subscribe(callback HeadBroadcastable) (unsubscribe func())
- type HeadTracker
- func (ht *HeadTracker) Backfill(ctx context.Context, head models.Head, depth uint) (err error)
- func (ht *HeadTracker) Connected() bool
- func (ht *HeadTracker) HighestSeenHead() *models.Head
- func (ht *HeadTracker) HighestSeenHeadFromDB() (*models.Head, error)
- func (ht *HeadTracker) Save(ctx context.Context, h models.Head) error
- func (ht *HeadTracker) Start() error
- func (ht *HeadTracker) Stop() error
- type InitiatorSubscription
- type JobSpecNotice
- type JobSubscriber
- type JobSubscription
- type ManagedSubscription
- type NullBalanceMonitor
- type NullExternalInitiatorManager
- type OneTime
- type PrometheusBackend
- type Recurring
- type RecurringScheduleJobError
- type RunExecutor
- type RunManager
- type RunQueue
- type Scheduler
- type Unsubscriber
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewExternalInitiatorManager ¶
func NewExternalInitiatorManager() *externalInitiatorManager
NewExternalInitiatorManager returns the concrete externalInitiatorManager
func NewPromReporter ¶
func NewPromReporter(db *sql.DB, opts ...PrometheusBackend) store.HeadTrackable
func NewRun ¶
func NewRun( job *models.JobSpec, initiator *models.Initiator, currentHeight *big.Int, runRequest *models.RunRequest, config orm.ConfigReader, orm *orm.ORM, now time.Time) (*models.JobRun, []*adapters.PipelineAdapter)
NewRun returns a complete run from a JobSpec
func NewStoreReaper ¶
func NewStoreReaper(store *store.Store) utils.SleeperTask
NewStoreReaper creates a reaper that cleans stale objects from the store.
func ReceiveLogRequest ¶
func ReceiveLogRequest(runManager RunManager, le models.LogRequest)
ReceiveLogRequest parses the log and runs the job it indicated by its GetJobSpecID method
func ValidateBridgeType ¶
func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error
ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name or invalid url
func ValidateBridgeTypeNotExist ¶
func ValidateBridgeTypeNotExist(bt *models.BridgeTypeRequest, store *store.Store) error
ValidateBridgeTypeNotExist checks that a bridge has not already been created
func ValidateExternalInitiator ¶
func ValidateExternalInitiator( exi *models.ExternalInitiatorRequest, store *store.Store, ) error
ValidateExternalInitiator checks whether External Initiator parameters are safe for processing.
func ValidateInitiator ¶
ValidateInitiator checks the Initiator for any application logic errors.
func ValidateJob ¶
ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.
func ValidateRun ¶
ValidateRun ensures that a run's initial preconditions have been met
func ValidateServiceAgreement ¶
func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error
ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.
Types ¶
type BalanceMonitor ¶
type BalanceMonitor interface { store.HeadTrackable GetEthBalance(gethCommon.Address) *assets.Eth Stop() error }
BalanceMonitor checks the balance for each key on every new head
func NewBalanceMonitor ¶
func NewBalanceMonitor(store *store.Store) BalanceMonitor
NewBalanceMonitor returns a new balanceMonitor
type HeadBroadcastable ¶
HeadBroadcastable defines the interface for listeners
type HeadBroadcaster ¶
type HeadBroadcaster struct { utils.StartStopOnce // contains filtered or unexported fields }
HeadBroadcaster relays heads from the head tracker to subscribed jobs, it is less robust against congestion than the head tracker, and missed heads should be expected by consuming jobs
func NewHeadBroadcaster ¶
func NewHeadBroadcaster() *HeadBroadcaster
NewHeadBroadcaster creates a new HeadBroadcaster
func (*HeadBroadcaster) Close ¶
func (hr *HeadBroadcaster) Close() error
func (*HeadBroadcaster) Disconnect ¶
func (hr *HeadBroadcaster) Disconnect()
func (*HeadBroadcaster) OnNewLongestChain ¶
func (hr *HeadBroadcaster) OnNewLongestChain(ctx context.Context, head models.Head)
func (*HeadBroadcaster) Start ¶
func (hr *HeadBroadcaster) Start() error
func (*HeadBroadcaster) Subscribe ¶
func (hr *HeadBroadcaster) Subscribe(callback HeadBroadcastable) (unsubscribe func())
type HeadTracker ¶
type HeadTracker struct {
// contains filtered or unexported fields
}
HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.
func NewHeadTracker ¶
func NewHeadTracker(store *strpkg.Store, callbacks []strpkg.HeadTrackable, sleepers ...utils.Sleeper) *HeadTracker
NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.
func (*HeadTracker) Backfill ¶
Backfill given a head will fill in any missing heads up to the given depth
func (*HeadTracker) Connected ¶
func (ht *HeadTracker) Connected() bool
Connected returns whether or not this HeadTracker is connected.
func (*HeadTracker) HighestSeenHead ¶
func (ht *HeadTracker) HighestSeenHead() *models.Head
HighestSeenHead returns the block header with the highest number that has been seen, or nil
func (*HeadTracker) HighestSeenHeadFromDB ¶
func (ht *HeadTracker) HighestSeenHeadFromDB() (*models.Head, error)
func (*HeadTracker) Save ¶
Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.
func (*HeadTracker) Start ¶
func (ht *HeadTracker) Start() error
Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.
func (*HeadTracker) Stop ¶
func (ht *HeadTracker) Stop() error
Stop unsubscribes all connections and fires Disconnect.
type InitiatorSubscription ¶
type InitiatorSubscription struct { *ManagedSubscription Initiator models.Initiator // contains filtered or unexported fields }
InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Adamoracle Initiator. Initiator specific functionality is delegated to the callback.
func NewInitiatorSubscription ¶
func NewInitiatorSubscription( initr models.Initiator, client eth.Client, runManager RunManager, nextHead *big.Int, config orm.ConfigReader, callback func(RunManager, models.LogRequest), ) (InitiatorSubscription, error)
NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.
type JobSpecNotice ¶
type JobSpecNotice struct { JobID models.JobID `json:"jobId"` Type string `json:"type"` Params models.JSON `json:"params,omitempty"` }
JobSpecNotice is sent to the External Initiator when JobSpecs are created.
func NewJobSpecNotice ¶
NewJobSpecNotice returns a new JobSpec.
type JobSubscriber ¶
type JobSubscriber interface { store.HeadTrackable AddJob(job models.JobSpec, bn *models.Head) error RemoveJob(ID models.JobID) error Jobs() []models.JobSpec Stop() error }
JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.
func NewJobSubscriber ¶
func NewJobSubscriber(store *store.Store, runManager RunManager) JobSubscriber
NewJobSubscriber returns a new job subscriber.
type JobSubscription ¶
JobSubscription listens to event logs being pushed from the Ethereum Node to a job.
func StartJobSubscription ¶
func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store, runManager RunManager) (JobSubscription, error)
StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.
func (JobSubscription) Unsubscribe ¶
func (js JobSubscription) Unsubscribe()
Unsubscribe stops the subscription and cleans up associated resources.
type ManagedSubscription ¶
type ManagedSubscription struct {
// contains filtered or unexported fields
}
ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.
func NewManagedSubscription ¶
func NewManagedSubscription(logSubscriber eth.Client, filter ethereum.FilterQuery, callback func(types.Log), backfillBatchSize uint32) (*ManagedSubscription, error)
NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.
func (ManagedSubscription) Unsubscribe ¶
func (sub ManagedSubscription) Unsubscribe()
Unsubscribe closes channels and cleans up resources.
type NullBalanceMonitor ¶
type NullBalanceMonitor struct{}
func (*NullBalanceMonitor) Disconnect ¶
func (*NullBalanceMonitor) Disconnect()
func (*NullBalanceMonitor) GetEthBalance ¶
func (*NullBalanceMonitor) GetEthBalance(gethCommon.Address) *assets.Eth
func (*NullBalanceMonitor) OnNewLongestChain ¶
func (*NullBalanceMonitor) OnNewLongestChain(ctx context.Context, head models.Head)
func (*NullBalanceMonitor) Stop ¶
func (*NullBalanceMonitor) Stop() error
type NullExternalInitiatorManager ¶
type NullExternalInitiatorManager struct{}
type OneTime ¶
type OneTime struct { Store *store.Store Clock utils.Afterer RunManager RunManager // contains filtered or unexported fields }
OneTime represents runs that are to be executed only once.
func (*OneTime) RunJobAt ¶
RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.
type PrometheusBackend ¶
type Recurring ¶
Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().
func NewRecurring ¶
func NewRecurring(runManager RunManager) *Recurring
NewRecurring create a new instance of Recurring, ready to use.
func (*Recurring) AddJob ¶
AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.
type RecurringScheduleJobError ¶
type RecurringScheduleJobError struct {
// contains filtered or unexported fields
}
RecurringScheduleJobError contains the field for the error message.
func (RecurringScheduleJobError) Error ¶
func (err RecurringScheduleJobError) Error() string
Error returns the error message for the run.
type RunExecutor ¶
RunExecutor handles the actual running of the job tasks
func NewRunExecutor ¶
func NewRunExecutor(store *store.Store, statsPusher synchronization.StatsPusher) RunExecutor
NewRunExecutor initializes a RunExecutor.
type RunManager ¶
type RunManager interface { Create( jobSpecID models.JobID, initiator *models.Initiator, creationHeight *big.Int, runRequest *models.RunRequest) (*models.JobRun, error) CreateErrored( jobSpecID models.JobID, initiator models.Initiator, err error) (*models.JobRun, error) ResumePendingBridge( runID uuid.UUID, input models.BridgeRunResult) error Cancel(runID uuid.UUID) (*models.JobRun, error) ResumeAllInProgress() error ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error ResumeAllPendingConnection() error }
RunManager supplies methods for queueing, resuming and cancelling jobs in the RunQueue
func NewRunManager ¶
func NewRunManager( runQueue RunQueue, config orm.ConfigReader, orm *orm.ORM, statsPusher synchronization.StatsPusher, clock utils.AfterNower) RunManager
NewRunManager returns a new job manager
type RunQueue ¶
RunQueue safely handles coordinating job runs.
func NewRunQueue ¶
func NewRunQueue(runExecutor RunExecutor) RunQueue
NewRunQueue initializes a RunQueue.
type Scheduler ¶
type Scheduler struct { Recurring *Recurring OneTime *OneTime // contains filtered or unexported fields }
Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.
func NewScheduler ¶
func NewScheduler(store *store.Store, runManager RunManager) *Scheduler
NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.
func (*Scheduler) AddJob ¶
AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.
type Unsubscriber ¶
type Unsubscriber interface {
Unsubscribe()
}
Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
signatures
|
|
cryptotest
package cryptotest provides convenience functions for kyber-based APIs.
|
package cryptotest provides convenience functions for kyber-based APIs. |
ethdss
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
|
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited. |
ethschnorr
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
|
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited. |
secp256k1
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
|
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited. |
Numbers are deterministically generated from seeds and a secret key, and are statistically indistinguishable from uniform sampling from {0,...,2**256-1}, to computationally-bounded observers who know the seeds, don't know the key, and only see the generated numbers.
|
Numbers are deterministically generated from seeds and a secret key, and are statistically indistinguishable from uniform sampling from {0,...,2**256-1}, to computationally-bounded observers who know the seeds, don't know the key, and only see the generated numbers. |