job

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2022 License: MIT Imports: 34 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoSuchKeyBundle          = errors.New("no such key bundle exists")
	ErrNoSuchTransmitterAddress = errors.New("no such transmitter address exists")
	ErrNoSuchPublicKey          = errors.New("no such public key exists")
)
View Source
var (
	ErrNoPipelineSpec       = errors.New("pipeline spec not specified")
	ErrInvalidJobType       = errors.New("invalid job type")
	ErrInvalidSchemaVersion = errors.New("invalid schema version")
)

Functions

func ExternalJobIDEncodeBytesToTopic added in v1.0.0

func ExternalJobIDEncodeBytesToTopic(id uuid.UUID) common.Hash

func ExternalJobIDEncodeStringToTopic added in v1.0.0

func ExternalJobIDEncodeStringToTopic(id uuid.UUID) common.Hash

func LoadAllJobTypes added in v1.1.0

func LoadAllJobTypes(tx pg.Queryer, job *Job) error

func LoadAllJobsTypes added in v1.1.0

func LoadAllJobsTypes(tx pg.Queryer, jobs []Job) error

NOTE: N+1 query, be careful of performance This is not easily fixable without complicating the logic a lot, since we only use it in the GUI it's probably acceptable

func NewORM

func NewORM(
	db *sqlx.DB,
	chainSet evm.ChainSet,
	pipelineORM pipeline.ORM,
	keyStore keystore.Master,
	lggr logger.Logger,
	cfg pg.LogConfig,
) *orm

func NewSpawner

func NewSpawner(orm ORM, config Config, jobTypeDelegates map[Type]Delegate, db *sqlx.DB, lggr logger.Logger, lbDependentAwaiters []utils.DependentAwaiter) *spawner

Types

type Config

type Config interface {
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() url.URL
	TriggerFallbackDBPollInterval() time.Duration
	LogSQL() bool
}

type CronSpec added in v0.10.5

type CronSpec struct {
	ID           int32     `toml:"-"`
	CronSchedule string    `toml:"schedule"`
	CreatedAt    time.Time `toml:"-"`
	UpdatedAt    time.Time `toml:"-"`
}

func (CronSpec) GetID added in v0.10.5

func (s CronSpec) GetID() string

func (*CronSpec) SetID added in v0.10.5

func (s *CronSpec) SetID(value string) error

func (CronSpec) TableName added in v0.10.5

func (CronSpec) TableName() string

type DRSpecConfig added in v1.1.0

type DRSpecConfig interface {
	MinIncomingConfirmations() uint32
}

type Delegate

type Delegate interface {
	JobType() Type
	// ServicesForSpec returns services to be started and stopped for this
	// job. In case a given job type relies upon well-defined startup/shutdown
	// ordering for services, they are started in the order they are given
	// and stopped in reverse order.
	ServicesForSpec(spec Job) ([]Service, error)
	AfterJobCreated(spec Job)
	BeforeJobDeleted(spec Job)
}

TODO(spook): I can't wait for Go generics

type DirectRequestSpec added in v0.9.9

type DirectRequestSpec struct {
	ID                          int32                    `toml:"-"`
	ContractAddress             ethkey.EIP55Address      `toml:"contractAddress"`
	MinIncomingConfirmations    clnull.Uint32            `toml:"minIncomingConfirmations"`
	MinIncomingConfirmationsEnv bool                     `toml:"minIncomingConfirmationsEnv"`
	Requesters                  models.AddressCollection `toml:"requesters"`
	MinContractPayment          *assets.Link             `toml:"minContractPaymentLinkJuels"`
	EVMChainID                  *utils.Big               `toml:"evmChainID"`
	CreatedAt                   time.Time                `toml:"-"`
	UpdatedAt                   time.Time                `toml:"-"`
}

func LoadEnvConfigVarsDR added in v1.1.0

func LoadEnvConfigVarsDR(cfg DRSpecConfig, drs DirectRequestSpec) *DirectRequestSpec

func (DirectRequestSpec) TableName added in v0.9.9

func (DirectRequestSpec) TableName() string

type ExternalInitiatorWebhookSpec added in v0.10.11

type ExternalInitiatorWebhookSpec struct {
	ExternalInitiatorID int64
	ExternalInitiator   bridges.ExternalInitiator
	WebhookSpecID       int32
	WebhookSpec         WebhookSpec
	Spec                models.JSON
}

type FluxMonitorSpec added in v0.9.9

type FluxMonitorSpec struct {
	ID              int32               `toml:"-"`
	ContractAddress ethkey.EIP55Address `toml:"contractAddress"`
	Threshold       float32             `toml:"threshold,float"`
	// AbsoluteThreshold is the maximum absolute change allowed in a fluxmonitored
	// value before a new round should be kicked off, so that the current value
	// can be reported on-chain.
	AbsoluteThreshold   float32 `toml:"absoluteThreshold,float"`
	PollTimerPeriod     time.Duration
	PollTimerDisabled   bool
	IdleTimerPeriod     time.Duration
	IdleTimerDisabled   bool
	DrumbeatSchedule    string
	DrumbeatRandomDelay time.Duration
	DrumbeatEnabled     bool
	MinPayment          *assets.Link
	EVMChainID          *utils.Big `toml:"evmChainID"`
	CreatedAt           time.Time  `toml:"-"`
	UpdatedAt           time.Time  `toml:"-"`
}

type FluxMonitorSpecIntThreshold added in v1.0.0

type FluxMonitorSpecIntThreshold struct {
	ContractAddress     ethkey.EIP55Address `toml:"contractAddress"`
	Threshold           int                 `toml:"threshold"`
	AbsoluteThreshold   int                 `toml:"absoluteThreshold"`
	PollTimerPeriod     time.Duration
	PollTimerDisabled   bool
	IdleTimerPeriod     time.Duration
	IdleTimerDisabled   bool
	DrumbeatSchedule    string
	DrumbeatRandomDelay time.Duration
	DrumbeatEnabled     bool
	MinPayment          *assets.Link
	EVMChainID          *utils.Big `toml:"evmChainID"`
}

Need to also try integer thresholds until https://github.com/pelletier/go-toml/issues/571 is addressed. The UI's TOML.stringify({"threshold": 1.0}) (https://github.com/iarna/iarna-toml) will return "threshold = 1" since ts/js doesn't know the difference between 1.0 and 1, so we need to address it on the backend.

type Job added in v0.10.3

type Job struct {
	ID                            int32     `toml:"-"`
	ExternalJobID                 uuid.UUID `toml:"externalJobID"`
	OffchainreportingOracleSpecID *int32
	OffchainreportingOracleSpec   *OffchainReportingOracleSpec
	CronSpecID                    *int32
	CronSpec                      *CronSpec
	DirectRequestSpecID           *int32
	DirectRequestSpec             *DirectRequestSpec
	FluxMonitorSpecID             *int32
	FluxMonitorSpec               *FluxMonitorSpec
	KeeperSpecID                  *int32
	KeeperSpec                    *KeeperSpec
	VRFSpecID                     *int32
	VRFSpec                       *VRFSpec
	WebhookSpecID                 *int32
	WebhookSpec                   *WebhookSpec
	PipelineSpecID                int32
	PipelineSpec                  *pipeline.Spec
	JobSpecErrors                 []SpecError
	Type                          Type
	SchemaVersion                 uint32
	Name                          null.String
	MaxTaskDuration               models.Interval
	Pipeline                      pipeline.Pipeline `toml:"observationSource"`
	CreatedAt                     time.Time
}

func (Job) ExternalIDEncodeBytesToTopic added in v0.10.10

func (j Job) ExternalIDEncodeBytesToTopic() common.Hash

The external job ID (UUID) can also be encoded into a log topic (32 bytes) by taking the 16 bytes undelying the UUID and right padding it.

func (Job) ExternalIDEncodeStringToTopic added in v0.10.10

func (j Job) ExternalIDEncodeStringToTopic() common.Hash

The external job ID (UUID) can be encoded into a log topic (32 bytes) by taking the string representation of the UUID, removing the dashes so that its 32 characters long and then encoding those characters to bytes.

func (*Job) SetID added in v0.10.5

func (j *Job) SetID(value string) error

SetID takes the id as a string and attempts to convert it to an int32. If it succeeds, it will set it as the id on the job

func (Job) TableName added in v0.10.3

func (j Job) TableName() string

type KeeperSpec added in v0.10.3

type KeeperSpec struct {
	ID                       int32               `toml:"-"`
	ContractAddress          ethkey.EIP55Address `toml:"contractAddress"`
	MinIncomingConfirmations *uint32             `toml:"minIncomingConfirmations"`
	FromAddress              ethkey.EIP55Address `toml:"fromAddress"`
	EVMChainID               *utils.Big          `toml:"evmChainID"`
	CreatedAt                time.Time           `toml:"-"`
	UpdatedAt                time.Time           `toml:"-"`
}

type NullDelegate added in v0.10.8

type NullDelegate struct {
	Type Type
}

func (*NullDelegate) AfterJobCreated added in v0.10.11

func (*NullDelegate) AfterJobCreated(spec Job)

func (*NullDelegate) BeforeJobDeleted added in v0.10.11

func (*NullDelegate) BeforeJobDeleted(spec Job)

func (*NullDelegate) JobType added in v0.10.8

func (n *NullDelegate) JobType() Type

func (*NullDelegate) ServicesForSpec added in v0.10.8

func (n *NullDelegate) ServicesForSpec(spec Job) (s []Service, err error)

type OCRSpecConfig added in v1.0.0

type OCRSpecConfig interface {
	P2PPeerID() p2pkey.PeerID
	OCRBlockchainTimeout() time.Duration
	OCRContractConfirmations() uint16
	OCRContractPollInterval() time.Duration
	OCRContractSubscribeInterval() time.Duration
	OCRObservationTimeout() time.Duration
	OCRTransmitterAddress() (ethkey.EIP55Address, error)
	OCRKeyBundleID() (string, error)
}

type ORM

type ORM interface {
	InsertWebhookSpec(webhookSpec *WebhookSpec, qopts ...pg.QOpt) error
	InsertJob(job *Job, qopts ...pg.QOpt) error
	CreateJob(jb *Job, qopts ...pg.QOpt) error
	FindJobs(offset, limit int) ([]Job, int, error)
	FindJobTx(id int32) (Job, error)
	FindJob(ctx context.Context, id int32) (Job, error)
	FindJobByExternalJobID(uuid uuid.UUID, qopts ...pg.QOpt) (Job, error)
	FindJobIDsWithBridge(name string) ([]int32, error)
	DeleteJob(id int32, qopts ...pg.QOpt) error
	RecordError(jobID int32, description string, qopts ...pg.QOpt) error
	// TryRecordError is a helper which calls RecordError and logs the returned error if present.
	TryRecordError(jobID int32, description string, qopts ...pg.QOpt)
	DismissError(ctx context.Context, errorID int32) error
	Close() error
	PipelineRuns(jobID *int32, offset, size int) ([]pipeline.Run, int, error)
	PipelineRunsByJobsIDs(jobsIDs []int32) (runs []pipeline.Run, err error)
}

type OffchainReportingOracleSpec added in v0.9.9

type OffchainReportingOracleSpec struct {
	ID                                        int32               `toml:"-"`
	ContractAddress                           ethkey.EIP55Address `toml:"contractAddress"`
	P2PPeerID                                 p2pkey.PeerID       `toml:"p2pPeerID" db:"p2p_peer_id"`
	P2PPeerIDEnv                              bool
	P2PBootstrapPeers                         pq.StringArray     `toml:"p2pBootstrapPeers" db:"p2p_bootstrap_peers"`
	IsBootstrapPeer                           bool               `toml:"isBootstrapPeer"`
	EncryptedOCRKeyBundleID                   *models.Sha256Hash `toml:"keyBundleID"`
	EncryptedOCRKeyBundleIDEnv                bool
	TransmitterAddress                        *ethkey.EIP55Address `toml:"transmitterAddress"`
	TransmitterAddressEnv                     bool
	ObservationTimeout                        models.Interval `toml:"observationTimeout"`
	ObservationTimeoutEnv                     bool
	BlockchainTimeout                         models.Interval `toml:"blockchainTimeout"`
	BlockchainTimeoutEnv                      bool
	ContractConfigTrackerSubscribeInterval    models.Interval `toml:"contractConfigTrackerSubscribeInterval"`
	ContractConfigTrackerSubscribeIntervalEnv bool
	ContractConfigTrackerPollInterval         models.Interval `toml:"contractConfigTrackerPollInterval"`
	ContractConfigTrackerPollIntervalEnv      bool
	ContractConfigConfirmations               uint16 `toml:"contractConfigConfirmations"`
	ContractConfigConfirmationsEnv            bool
	EVMChainID                                *utils.Big `toml:"evmChainID" db:"evm_chain_id"`
	CreatedAt                                 time.Time  `toml:"-"`
	UpdatedAt                                 time.Time  `toml:"-"`
}

func LoadEnvConfigVarsLocalOCR added in v1.1.0

func LoadEnvConfigVarsLocalOCR(cfg OCRSpecConfig, os OffchainReportingOracleSpec) *OffchainReportingOracleSpec

func LoadEnvConfigVarsOCR added in v1.1.0

func LoadEnvConfigVarsOCR(cfg OCRSpecConfig, p2pStore keystore.P2P, os OffchainReportingOracleSpec) (*OffchainReportingOracleSpec, error)

func (OffchainReportingOracleSpec) GetID added in v0.9.9

func (*OffchainReportingOracleSpec) SetID added in v0.9.9

func (s *OffchainReportingOracleSpec) SetID(value string) error

func (OffchainReportingOracleSpec) TableName added in v0.9.9

func (OffchainReportingOracleSpec) TableName() string

type PipelineRun added in v0.9.9

type PipelineRun struct {
	ID int64 `json:"-"`
}

func (PipelineRun) GetID added in v0.9.9

func (pr PipelineRun) GetID() string

func (*PipelineRun) SetID added in v0.9.9

func (pr *PipelineRun) SetID(value string) error

type Service

type Service interface {
	Start() error
	Close() error
}

type Spawner

type Spawner interface {
	service.Service
	CreateJob(jb *Job, qopts ...pg.QOpt) error
	DeleteJob(jobID int32, qopts ...pg.QOpt) error
	ActiveJobs() map[int32]Job

	// NOTE: Prefer to use CreateJob, this is only publicly exposed for use in tests
	// to start a job that was previously manually inserted into DB
	StartService(spec Job) error
}

The job spawner manages the spinning up and spinning down of the long-running services that perform the work described by job specs. Each active job spec has 1 or more of these services associated with it.

type SpecError added in v0.9.9

type SpecError struct {
	ID          int64
	JobID       int32
	Description string
	Occurrences uint
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

func (SpecError) TableName added in v0.9.9

func (SpecError) TableName() string

type Type

type Type string
const (
	Cron              Type = "cron"
	DirectRequest     Type = "directrequest"
	FluxMonitor       Type = "fluxmonitor"
	OffchainReporting Type = "offchainreporting"
	Keeper            Type = "keeper"
	VRF               Type = "vrf"
	Webhook           Type = "webhook"
)

func ValidateSpec added in v0.10.11

func ValidateSpec(ts string) (Type, error)

ValidateSpec is the common spec validation

func (Type) RequiresPipelineSpec added in v0.10.11

func (t Type) RequiresPipelineSpec() bool

func (Type) SchemaVersion added in v1.1.0

func (t Type) SchemaVersion() uint32

func (Type) String added in v0.9.9

func (t Type) String() string

func (Type) SupportsAsync added in v0.10.11

func (t Type) SupportsAsync() bool

type VRFSpec added in v0.10.8

type VRFSpec struct {
	ID                       int32
	CoordinatorAddress       ethkey.EIP55Address  `toml:"coordinatorAddress"`
	PublicKey                secp256k1.PublicKey  `toml:"publicKey"`
	MinIncomingConfirmations uint32               `toml:"minIncomingConfirmations"`
	ConfirmationsEnv         bool                 `toml:"-"`
	EVMChainID               *utils.Big           `toml:"evmChainID"`
	FromAddress              *ethkey.EIP55Address `toml:"fromAddress"`
	PollPeriod               time.Duration        `toml:"pollPeriod"` // For v2 jobs
	PollPeriodEnv            bool
	RequestedConfsDelay      int64     `toml:"requestedConfsDelay"` // For v2 jobs. Optional, defaults to 0 if not provided.
	CreatedAt                time.Time `toml:"-"`
	UpdatedAt                time.Time `toml:"-"`
}

func LoadEnvConfigVarsVRF added in v1.1.0

func LoadEnvConfigVarsVRF(cfg DRSpecConfig, vrfs VRFSpec) *VRFSpec

type WebhookSpec added in v0.10.8

type WebhookSpec struct {
	ID                            int32 `toml:"-"`
	ExternalInitiatorWebhookSpecs []ExternalInitiatorWebhookSpec
	CreatedAt                     time.Time `json:"createdAt" toml:"-"`
	UpdatedAt                     time.Time `json:"updatedAt" toml:"-"`
}

func (WebhookSpec) GetID added in v0.10.8

func (w WebhookSpec) GetID() string

func (*WebhookSpec) SetID added in v0.10.8

func (w *WebhookSpec) SetID(value string) error

func (WebhookSpec) TableName added in v0.10.8

func (WebhookSpec) TableName() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL