jobs

package module
v5.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: MIT Imports: 35 Imported by: 1

README

Documentation

Index

Constants

View Source
const (
	// RrMode env variable
	RrMode     string = "RR_MODE"
	RrModeJobs string = "jobs"

	PluginName string = "jobs"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CfgOptions

type CfgOptions struct {
	// Parallelism configures the number of pipelines to be started at the same time
	Parallelism int `mapstructure:"parallelism"`
}

type Config

type Config struct {
	// NumPollers configures number of priority queue pollers
	// Default - num logical cores
	NumPollers int `mapstructure:"num_pollers"`
	// Options contain additional configuration options for the job plugin
	CfgOptions *CfgOptions `mapstructure:"options"`
	// PipelineSize is the limit of a main jobs queue which consumes Items from the driver's pipeline
	// a Driver pipeline might be much larger than a main jobs queue
	PipelineSize uint64 `mapstructure:"pipeline_size"`
	// Timeout in seconds is the per-push limit to put the job into the queue
	Timeout int `mapstructure:"timeout"`
	// Pool configures roadrunner workers pool.
	Pool *poolImpl.Config `mapstructure:"pool"`
	// Pipelines define mapping between PHP job pipeline and associated job broker.
	Pipelines map[string]Pipeline `mapstructure:"pipelines"`
	// Consuming specifies names of pipelines to be consumed on service start.
	Consume []string `mapstructure:"consume"`
}

Config defines settings for job broker, workers and job-pipeline mapping.

func (*Config) InitDefaults

func (c *Config) InitDefaults()

type Configurer

type Configurer interface {
	// Experimental checks if there are any experimental features enabled.
	Experimental() bool
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if config section exists.
	Has(name string) bool
}

type Informer

type Informer interface {
	Workers() []*process.State
}

type Job

type Job struct {
	// Job contains name of job broker (usually PHP class).
	Job string `json:"job"`
	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`
	// Payload is string data (usually JSON) passed to Job broker.
	Pld []byte `json:"payload"`
	// Headers with key-value pairs
	Hdr map[string][]string `json:"headers"`
	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Job) AutoAck

func (j *Job) AutoAck() bool

func (*Job) Delay

func (j *Job) Delay() int64

func (*Job) GroupID

func (j *Job) GroupID() string

func (*Job) Headers

func (j *Job) Headers() map[string][]string

func (*Job) ID

func (j *Job) ID() string

func (*Job) Metadata

func (j *Job) Metadata() string

func (*Job) Name

func (j *Job) Name() string

func (*Job) Offset

func (j *Job) Offset() int64

Kafka Options ---------------------------------

func (*Job) Partition

func (j *Job) Partition() int32

func (*Job) Payload

func (j *Job) Payload() []byte

func (*Job) Priority

func (j *Job) Priority() int64

func (*Job) Topic

func (j *Job) Topic() string

func (*Job) UpdatePriority

func (j *Job) UpdatePriority(p int64)

type Logger

type Logger interface {
	NamedLogger(name string) *zap.Logger
}

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`
	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`
	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`
	// AutoAck use to ack a job right after it arrived from the driver
	AutoAck bool `json:"auto_ack"`

	// kafka specific fields
	// Topic is kafka topic
	Topic string `json:"topic"`
	// Optional metadata
	Metadata string `json:"metadata"`
	// Kafka offset
	Offset int64 `json:"offset"`
	// Kafka partition
	Partition int32 `json:"partition"`
}

Options carry information about how to handle given job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

type Pipeline

type Pipeline map[string]any

Pipeline defines pipeline options.

func (Pipeline) Bool

func (p Pipeline) Bool(name string, d bool) bool

Bool must return option value as bool or return default value.

func (Pipeline) Driver

func (p Pipeline) Driver() string

Driver associated with the pipeline.

func (Pipeline) Get

func (p Pipeline) Get(key string) any

Get used to get the data associated with the key

func (Pipeline) Has

func (p Pipeline) Has(name string) bool

Has checks if value presented in pipeline.

func (Pipeline) Int

func (p Pipeline) Int(name string, d int) int

Int must return option value as string or return default value.

func (Pipeline) Map

func (p Pipeline) Map(name string, out map[string]string) error

Map must return nested map value or empty config. There might be sqs attributes or tags, for example

func (Pipeline) Name

func (p Pipeline) Name() string

Name returns pipeline name.

func (Pipeline) Priority

func (p Pipeline) Priority() int64

Priority returns default pipeline priority

func (Pipeline) String

func (p Pipeline) String(name string, d string) string

String must return option value as string or return default value.

func (Pipeline) With

func (p Pipeline) With(name string, value any)

With pipeline value

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) AddWorker

func (p *Plugin) AddWorker() error

func (*Plugin) Collects

func (p *Plugin) Collects() []*dep.In

func (*Plugin) Declare

func (p *Plugin) Declare(ctx context.Context, pipeline jobsApi.Pipeline) error

Declare a pipeline.

func (*Plugin) Destroy

func (p *Plugin) Destroy(ctx context.Context, pp string) error

Destroy the pipeline and release all associated resources.

func (*Plugin) Init

func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error

func (*Plugin) JobsState

func (p *Plugin) JobsState(ctx context.Context) ([]*jobsApi.State, error)

func (*Plugin) List

func (p *Plugin) List() []string

func (*Plugin) MetricsCollector

func (p *Plugin) MetricsCollector() []prometheus.Collector

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Pause

func (p *Plugin) Pause(ctx context.Context, pp string) error

func (*Plugin) Push

func (p *Plugin) Push(ctx context.Context, j jobsApi.Message) error

func (*Plugin) PushBatch

func (p *Plugin) PushBatch(ctx context.Context, j []jobsApi.Message) error

func (*Plugin) RPC

func (p *Plugin) RPC() any

func (*Plugin) Ready

func (p *Plugin) Ready() (*status.Status, error)

Ready return readiness status of the particular plugin

func (*Plugin) RemoveWorker

func (p *Plugin) RemoveWorker(ctx context.Context) error

func (*Plugin) Reset

func (p *Plugin) Reset() error

func (*Plugin) Resume

func (p *Plugin) Resume(ctx context.Context, pp string) error

func (*Plugin) Serve

func (p *Plugin) Serve() chan error

func (*Plugin) Status

func (p *Plugin) Status() (*status.Status, error)

Status return status of the particular plugin

func (*Plugin) Stop

func (p *Plugin) Stop(ctx context.Context) error

func (*Plugin) Workers

func (p *Plugin) Workers() []*process.State

type Pool

type Pool interface {
	// Workers returns worker list associated with the pool.
	Workers() (workers []*worker.Process)
	// Exec payload
	Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *staticPool.PExec, error)
	// RemoveWorker removes worker from the pool.
	RemoveWorker(ctx context.Context) error
	// AddWorker adds worker to the pool.
	AddWorker() error
	// Reset kill all workers inside the watcher and replaces with new
	Reset(ctx context.Context) error
	// Destroy all underlying stack (but let them complete the task).
	Destroy(ctx context.Context)
}

type Server

type Server interface {
	NewPool(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger) (*staticPool.Pool, error)
}

Server creates workers for the application.

type StatsExporter

type StatsExporter struct {
	TotalMemoryDesc  *prometheus.Desc
	StateDesc        *prometheus.Desc
	WorkerMemoryDesc *prometheus.Desc
	TotalWorkersDesc *prometheus.Desc

	WorkersReady   *prometheus.Desc
	WorkersWorking *prometheus.Desc
	WorkersInvalid *prometheus.Desc

	Workers Informer
}

func (*StatsExporter) Collect

func (s *StatsExporter) Collect(ch chan<- prometheus.Metric)

func (*StatsExporter) Describe

func (s *StatsExporter) Describe(d chan<- *prometheus.Desc)

type Tracer

type Tracer interface {
	Tracer() *sdktrace.TracerProvider
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL