streams

package
v2.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExtractBigInt

func ExtractBigInt(trrs pipeline.TaskRunResults) (*big.Int, error)

ExtractBigInt returns a result of a pipeline run that returns one single decimal result, as a *big.Int. This acts as a reference/example method, other methods can be implemented to extract any desired type that matches a particular pipeline run output. Returns error on parse errors: if results are wrong type

func ValidatedStreamSpec

func ValidatedStreamSpec(tomlString string) (job.Job, error)

Types

type Delegate

type Delegate struct {
	// contains filtered or unexported fields
}

func NewDelegate

func NewDelegate(lggr logger.Logger, registry Registry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate

func (*Delegate) AfterJobCreated

func (d *Delegate) AfterJobCreated(jb job.Job)

func (*Delegate) BeforeJobCreated

func (d *Delegate) BeforeJobCreated(jb job.Job)

func (*Delegate) BeforeJobDeleted

func (d *Delegate) BeforeJobDeleted(jb job.Job)

func (*Delegate) JobType

func (d *Delegate) JobType() job.Type

func (*Delegate) OnDeleteJob

func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error

func (*Delegate) ServicesForSpec

func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err error)

type DelegateConfig

type DelegateConfig interface {
	MaxSuccessfulRuns() uint64
	ResultWriteQueueDepth() uint64
}

type Registry

type Registry interface {
	Get(streamID StreamID) (strm Stream, exists bool)
	Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error
	Unregister(streamID StreamID)
}

func NewRegistry

func NewRegistry(lggr logger.Logger, runner Runner) Registry

type ResultRunSaver

type ResultRunSaver interface {
	Save(run *pipeline.Run)
}

type RunResultSaver

type RunResultSaver interface {
	Save(run *pipeline.Run)
}

type Runner

type Runner interface {
	ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error)
	InitializePipeline(spec pipeline.Spec) (*pipeline.Pipeline, error)
}

type Stream

type Stream interface {
	Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error)
}

func NewStream

func NewStream(lggr logger.Logger, id StreamID, spec pipeline.Spec, runner Runner, rrs RunResultSaver) Stream

type StreamID

type StreamID = string

type StreamService

type StreamService struct {
	// contains filtered or unexported fields
}

func (*StreamService) Close

func (s *StreamService) Close() error

func (*StreamService) Start

func (s *StreamService) Start(_ context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL