job

package
v0.10.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: MIT Imports: 28 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoSuchPeerID             = errors.New("no such peer id exists")
	ErrNoSuchKeyBundle          = errors.New("no such key bundle exists")
	ErrNoSuchTransmitterAddress = errors.New("no such transmitter address exists")
	ErrNoSuchPublicKey          = errors.New("no such public key exists")
)
View Source
var (
	ErrViolatesForeignKeyConstraint = errors.New("violates foreign key constraint")
)

Functions

func GetORMAdvisoryLockClassID added in v0.9.9

func GetORMAdvisoryLockClassID(oi ORM) int32

func GetORMClaimedJobIDs added in v0.9.9

func GetORMClaimedJobIDs(oi ORM) (ids []int32)

func NewORM

func NewORM(db *gorm.DB, config *storm.Config, pipelineORM pipeline.ORM, eventBroadcaster postgres.EventBroadcaster, advisoryLocker postgres.AdvisoryLocker) *orm

func NewSpawner

func NewSpawner(orm ORM, config Config, jobTypeDelegates map[Type]Delegate) *spawner

func PreloadAllJobTypes added in v0.10.8

func PreloadAllJobTypes(db *gorm.DB) *gorm.DB

func SetORMClaimedJobs added in v0.9.9

func SetORMClaimedJobs(oi ORM, jobs []Job)

Types

type Config

type Config interface {
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() url.URL
	TriggerFallbackDBPollInterval() time.Duration
}

type CronSpec added in v0.10.5

type CronSpec struct {
	ID           int32     `toml:"-" gorm:"primary_key"`
	CronSchedule string    `toml:"schedule"`
	CreatedAt    time.Time `toml:"-"`
	UpdatedAt    time.Time `toml:"-"`
}

func (*CronSpec) BeforeCreate added in v0.10.5

func (s *CronSpec) BeforeCreate(db *gorm.DB) error

func (*CronSpec) BeforeSave added in v0.10.5

func (s *CronSpec) BeforeSave(db *gorm.DB) error

func (CronSpec) GetID added in v0.10.5

func (s CronSpec) GetID() string

func (*CronSpec) SetID added in v0.10.5

func (s *CronSpec) SetID(value string) error

func (CronSpec) TableName added in v0.10.5

func (CronSpec) TableName() string

type Delegate

type Delegate interface {
	JobType() Type
	// ServicesForSpec returns services to be started and stopped for this
	// job. In case a given job type relies upon well-defined startup/shutdown
	// ordering for services, they are started in the order they are given
	// and stopped in reverse order.
	ServicesForSpec(spec Job) ([]Service, error)
	OnJobCreated(spec Job)
	OnJobDeleted(spec Job)
}

TODO(spook): I can't wait for Go generics

type DirectRequestSpec added in v0.9.9

type DirectRequestSpec struct {
	ID                       int32               `toml:"-" gorm:"primary_key"`
	ContractAddress          ethkey.EIP55Address `toml:"contractAddress"`
	MinIncomingConfirmations clnull.Uint32       `toml:"minIncomingConfirmations"`
	CreatedAt                time.Time           `toml:"-"`
	UpdatedAt                time.Time           `toml:"-"`
}

func (DirectRequestSpec) TableName added in v0.9.9

func (DirectRequestSpec) TableName() string

type FluxMonitorSpec added in v0.9.9

type FluxMonitorSpec struct {
	ID              int32               `toml:"-" gorm:"primary_key"`
	ContractAddress ethkey.EIP55Address `toml:"contractAddress"`
	Precision       int32               `gorm:"type:smallint"`
	Threshold       float32             `toml:"threshold,float"`
	// AbsoluteThreshold is the maximum absolute change allowed in a fluxmonitored
	// value before a new round should be kicked off, so that the current value
	// can be reported on-chain.
	AbsoluteThreshold float32       `toml:"absoluteThreshold,float" gorm:"type:float;not null"`
	PollTimerPeriod   time.Duration `gorm:"type:jsonb"`
	PollTimerDisabled bool          `gorm:"type:jsonb"`
	IdleTimerPeriod   time.Duration `gorm:"type:jsonb"`
	IdleTimerDisabled bool          `gorm:"type:jsonb"`
	MinPayment        *assets.Link
	CreatedAt         time.Time `toml:"-"`
	UpdatedAt         time.Time `toml:"-"`
}

type Job added in v0.10.3

type Job struct {
	ID                            int32     `toml:"-" gorm:"primary_key"`
	ExternalJobID                 uuid.UUID `toml:"externalJobID"`
	OffchainreportingOracleSpecID *int32
	OffchainreportingOracleSpec   *OffchainReportingOracleSpec
	CronSpecID                    *int32
	CronSpec                      *CronSpec
	DirectRequestSpecID           *int32
	DirectRequestSpec             *DirectRequestSpec
	FluxMonitorSpecID             *int32
	FluxMonitorSpec               *FluxMonitorSpec
	KeeperSpecID                  *int32
	KeeperSpec                    *KeeperSpec
	VRFSpecID                     *int32
	VRFSpec                       *VRFSpec
	WebhookSpecID                 *int32
	WebhookSpec                   *WebhookSpec
	ExternalInitiator             *models.ExternalInitiator `toml:"-" gorm:"-"`
	PipelineSpecID                int32
	PipelineSpec                  *pipeline.Spec
	JobSpecErrors                 []SpecError `gorm:"foreignKey:JobID"`
	Type                          Type
	SchemaVersion                 uint32
	Name                          null.String
	MaxTaskDuration               models.Interval
	Pipeline                      pipeline.TaskDAG `toml:"observationSource" gorm:"-"`
}

func GetORMClaimedJobs added in v0.9.9

func GetORMClaimedJobs(oi ORM) (claimedJobs []Job)

func PopulateExternalInitiator added in v0.10.8

func PopulateExternalInitiator(db *gorm.DB, jb Job) (Job, error)

func (Job) ExternalIDToTopicHash added in v0.10.8

func (j Job) ExternalIDToTopicHash() common.Hash

func (*Job) SetID added in v0.10.5

func (j *Job) SetID(value string) error

SetID takes the id as a string and attempts to convert it to an int32. If it succeeds, it will set it as the id on the job

func (Job) TableName added in v0.10.3

func (j Job) TableName() string

type KeeperSpec added in v0.10.3

type KeeperSpec struct {
	ID              int32               `toml:"-" gorm:"primary_key"`
	ContractAddress ethkey.EIP55Address `toml:"contractAddress"`
	FromAddress     ethkey.EIP55Address `toml:"fromAddress"`
	CreatedAt       time.Time           `toml:"-"`
	UpdatedAt       time.Time           `toml:"-"`
}

type NullDelegate added in v0.10.8

type NullDelegate struct {
	Type Type
}

func (*NullDelegate) JobType added in v0.10.8

func (n *NullDelegate) JobType() Type

func (*NullDelegate) OnJobCreated added in v0.10.8

func (*NullDelegate) OnJobCreated(spec Job)

func (*NullDelegate) OnJobDeleted added in v0.10.8

func (*NullDelegate) OnJobDeleted(spec Job)

func (*NullDelegate) ServicesForSpec added in v0.10.8

func (n *NullDelegate) ServicesForSpec(spec Job) (s []Service, err error)

type ORM

type ORM interface {
	ListenForNewJobs() (postgres.Subscription, error)
	ListenForDeletedJobs() (postgres.Subscription, error)
	ClaimUnclaimedJobs(ctx context.Context) ([]Job, error)
	CreateJob(ctx context.Context, jobSpec *Job, taskDAG pipeline.TaskDAG) error
	JobsV2() ([]Job, error)
	FindJob(id int32) (Job, error)
	FindJobIDsWithBridge(name string) ([]int32, error)
	DeleteJob(ctx context.Context, id int32) error
	RecordError(ctx context.Context, jobID int32, description string)
	UnclaimJob(ctx context.Context, id int32) error
	CheckForDeletedJobs(ctx context.Context) (deletedJobIDs []int32, err error)
	Close() error
	PipelineRunsByJobID(jobID int32, offset, size int) ([]pipeline.Run, int, error)
}

type OffchainReportingOracleSpec added in v0.9.9

type OffchainReportingOracleSpec struct {
	ID                                     int32                `toml:"-" gorm:"primary_key"`
	ContractAddress                        ethkey.EIP55Address  `toml:"contractAddress"`
	P2PPeerID                              *p2pkey.PeerID       `toml:"p2pPeerID" gorm:"column:p2p_peer_id;default:null"`
	P2PBootstrapPeers                      pq.StringArray       `toml:"p2pBootstrapPeers" gorm:"column:p2p_bootstrap_peers;type:text[]"`
	IsBootstrapPeer                        bool                 `toml:"isBootstrapPeer"`
	EncryptedOCRKeyBundleID                *models.Sha256Hash   `toml:"keyBundleID" gorm:"type:bytea"`
	TransmitterAddress                     *ethkey.EIP55Address `toml:"transmitterAddress"`
	ObservationTimeout                     models.Interval      `toml:"observationTimeout" gorm:"type:bigint;default:null"`
	BlockchainTimeout                      models.Interval      `toml:"blockchainTimeout" gorm:"type:bigint;default:null"`
	ContractConfigTrackerSubscribeInterval models.Interval      `toml:"contractConfigTrackerSubscribeInterval" gorm:"default:null"`
	ContractConfigTrackerPollInterval      models.Interval      `toml:"contractConfigTrackerPollInterval" gorm:"type:bigint;default:null"`
	ContractConfigConfirmations            uint16               `toml:"contractConfigConfirmations"`
	CreatedAt                              time.Time            `toml:"-"`
	UpdatedAt                              time.Time            `toml:"-"`
}

TODO: remove pointers when upgrading to gormv2 which has https://github.com/go-gorm/gorm/issues/2748 fixed.

func (*OffchainReportingOracleSpec) BeforeCreate added in v0.9.9

func (s *OffchainReportingOracleSpec) BeforeCreate(db *gorm.DB) error

func (*OffchainReportingOracleSpec) BeforeSave added in v0.9.9

func (s *OffchainReportingOracleSpec) BeforeSave(db *gorm.DB) error

func (OffchainReportingOracleSpec) GetID added in v0.9.9

func (*OffchainReportingOracleSpec) SetID added in v0.9.9

func (s *OffchainReportingOracleSpec) SetID(value string) error

func (OffchainReportingOracleSpec) TableName added in v0.9.9

func (OffchainReportingOracleSpec) TableName() string

type PipelineRun added in v0.9.9

type PipelineRun struct {
	ID int64 `json:"-" gorm:"primary_key"`
}

func (PipelineRun) GetID added in v0.9.9

func (pr PipelineRun) GetID() string

func (*PipelineRun) SetID added in v0.9.9

func (pr *PipelineRun) SetID(value string) error

type Service

type Service interface {
	Start() error
	Close() error
}

type Spawner

type Spawner interface {
	service.Service
	CreateJob(ctx context.Context, spec Job, name null.String) (int32, error)
	DeleteJob(ctx context.Context, jobID int32) error
	ActiveJobs() map[int32]Job
}

The job spawner manages the spinning up and spinning down of the long-running services that perform the work described by job specs. Each active job spec has 1 or more of these services associated with it.

At present, Flux Monitor and Offchain Reporting jobs can only have a single "initiator", meaning that they only require a single service. But the older "direct request" model allows for multiple initiators, which imply multiple services.

type SpecError added in v0.9.9

type SpecError struct {
	ID          int64 `gorm:"primary_key"`
	JobID       int32
	Description string
	Occurrences uint
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

func (SpecError) TableName added in v0.9.9

func (SpecError) TableName() string

type Type

type Type string
const (
	Cron              Type = "cron"
	DirectRequest     Type = "directrequest"
	FluxMonitor       Type = "fluxmonitor"
	OffchainReporting Type = "offchainreporting"
	Keeper            Type = "keeper"
	VRF               Type = "vrf"
	Webhook           Type = "webhook"
)

func (Type) HasPipelineSpec added in v0.10.8

func (t Type) HasPipelineSpec() bool

func (Type) String added in v0.9.9

func (t Type) String() string

type VRFSpec added in v0.10.8

type VRFSpec struct {
	ID                 int32
	CoordinatorAddress ethkey.EIP55Address `toml:"coordinatorAddress"`
	PublicKey          secp256k1.PublicKey `toml:"publicKey"`
	Confirmations      uint32              `toml:"confirmations"`
	CreatedAt          time.Time           `toml:"-"`
	UpdatedAt          time.Time           `toml:"-"`
}

type WebhookSpec added in v0.10.8

type WebhookSpec struct {
	ID                    int32        `toml:"-" gorm:"primary_key"`
	ExternalInitiatorName null.String  `toml:"externalInitiatorName"`
	ExternalInitiatorSpec *models.JSON `toml:"externalInitiatorSpec"`
	CreatedAt             time.Time    `json:"createdAt" toml:"-"`
	UpdatedAt             time.Time    `json:"updatedAt" toml:"-"`
}

func (*WebhookSpec) BeforeCreate added in v0.10.8

func (w *WebhookSpec) BeforeCreate(db *gorm.DB) error

func (*WebhookSpec) BeforeSave added in v0.10.8

func (w *WebhookSpec) BeforeSave(db *gorm.DB) error

func (WebhookSpec) GetID added in v0.10.8

func (w WebhookSpec) GetID() string

func (*WebhookSpec) SetID added in v0.10.8

func (w *WebhookSpec) SetID(value string) error

func (WebhookSpec) TableName added in v0.10.8

func (WebhookSpec) TableName() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL