jobs

package module
v4.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2023 License: MIT Imports: 35 Imported by: 1

README

Documentation

Index

Constants

View Source
const (
	// RrMode env variable
	RrMode     string = "RR_MODE"
	RrModeJobs string = "jobs"

	PluginName string = "jobs"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CfgOptions added in v4.6.1

type CfgOptions struct {
	// Parallelism configures number of pipelines to be started at the same time
	Parallelism int `mapstructure:"parallelism"`
}

type Config

type Config struct {
	// NumPollers configures number of priority queue pollers
	// Default - num logical cores
	NumPollers int `mapstructure:"num_pollers"`
	// Options contains additional configuration options for the jobs plugin
	CfgOptions *CfgOptions `mapstructure:"options"`
	// PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline
	// Driver pipeline might be much larger than a main jobs queue
	PipelineSize uint64 `mapstructure:"pipeline_size"`
	// Timeout in seconds is the per-push limit to put the job into queue
	Timeout int `mapstructure:"timeout"`
	// Pool configures roadrunner workers pool.
	Pool *poolImpl.Config `mapstructure:"pool"`
	// Pipelines defines mapping between PHP job pipeline and associated job broker.
	Pipelines map[string]Pipeline `mapstructure:"pipelines"`
	// Consuming specifies names of pipelines to be consumed on service start.
	Consume []string `mapstructure:"consume"`
}

Config defines settings for job broker, workers and job-pipeline mapping.

func (*Config) InitDefaults

func (c *Config) InitDefaults()

type Configurer

type Configurer interface {
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if config section exists.
	Has(name string) bool
}

type Informer

type Informer interface {
	Workers() []*process.State
}

type Job

type Job struct {
	// Job contains name of job broker (usually PHP class).
	Job string `json:"job"`
	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`
	// Payload is string data (usually JSON) passed to Job broker.
	Pld []byte `json:"payload"`
	// Headers with key-value pairs
	Hdr map[string][]string `json:"headers"`
	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Job) AutoAck

func (j *Job) AutoAck() bool

func (*Job) Delay

func (j *Job) Delay() int64

func (*Job) GroupID added in v4.4.0

func (j *Job) GroupID() string

func (*Job) Headers

func (j *Job) Headers() map[string][]string

func (*Job) ID

func (j *Job) ID() string

func (*Job) Metadata added in v4.1.0

func (j *Job) Metadata() string

func (*Job) Name

func (j *Job) Name() string

func (*Job) Offset added in v4.1.0

func (j *Job) Offset() int64

Kafka Options ---------------------------------

func (*Job) Partition added in v4.1.0

func (j *Job) Partition() int32

func (*Job) Payload

func (j *Job) Payload() []byte

func (*Job) Priority

func (j *Job) Priority() int64

func (*Job) Topic added in v4.1.0

func (j *Job) Topic() string

func (*Job) UpdatePriority added in v4.1.0

func (j *Job) UpdatePriority(p int64)

type Logger

type Logger interface {
	NamedLogger(name string) *zap.Logger
}

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`
	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`
	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`
	// AutoAck use to ack a job right after it arrived from the driver
	AutoAck bool `json:"auto_ack"`

	// kafka specific fields
	// Topic is kafka topic
	Topic string `json:"topic"`
	// Optional metadata
	Metadata string `json:"metadata"`
	// Kafka offset
	Offset int64 `json:"offset"`
	// Kafka partition
	Partition int32 `json:"partition"`
}

Options carry information about how to handle given job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

type Pipeline

type Pipeline map[string]any

Pipeline defines pipeline options.

func (Pipeline) Bool

func (p Pipeline) Bool(name string, d bool) bool

Bool must return option value as bool or return default value.

func (Pipeline) Driver

func (p Pipeline) Driver() string

Driver associated with the pipeline.

func (Pipeline) Get

func (p Pipeline) Get(key string) any

Get used to get the data associated with the key

func (Pipeline) Has

func (p Pipeline) Has(name string) bool

Has checks if value presented in pipeline.

func (Pipeline) Int

func (p Pipeline) Int(name string, d int) int

Int must return option value as string or return default value.

func (Pipeline) Map

func (p Pipeline) Map(name string, out map[string]string) error

Map must return nested map value or empty config. Here might be sqs attributes or tags for example

func (Pipeline) Name

func (p Pipeline) Name() string

Name returns pipeline name.

func (Pipeline) Priority

func (p Pipeline) Priority() int64

Priority returns default pipeline priority

func (Pipeline) String

func (p Pipeline) String(name string, d string) string

String must return option value as string or return default value.

func (Pipeline) With

func (p Pipeline) With(name string, value any)

With pipeline value

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) AddWorker added in v4.7.1

func (p *Plugin) AddWorker() error

func (*Plugin) Collects

func (p *Plugin) Collects() []*dep.In

func (*Plugin) Declare

func (p *Plugin) Declare(ctx context.Context, pipeline jobsApi.Pipeline) error

Declare a pipeline.

func (*Plugin) Destroy

func (p *Plugin) Destroy(ctx context.Context, pp string) error

Destroy pipeline and release all associated resources.

func (*Plugin) Init

func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error

func (*Plugin) JobsState

func (p *Plugin) JobsState(ctx context.Context) ([]*jobsApi.State, error)

func (*Plugin) List

func (p *Plugin) List() []string

func (*Plugin) MetricsCollector

func (p *Plugin) MetricsCollector() []prometheus.Collector

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Pause

func (p *Plugin) Pause(ctx context.Context, pp string) error

func (*Plugin) Push

func (p *Plugin) Push(ctx context.Context, j jobsApi.Message) error

func (*Plugin) PushBatch

func (p *Plugin) PushBatch(ctx context.Context, j []jobsApi.Message) error

func (*Plugin) RPC

func (p *Plugin) RPC() any

func (*Plugin) Ready added in v4.4.0

func (p *Plugin) Ready() (*status.Status, error)

Ready return readiness status of the particular plugin

func (*Plugin) RemoveWorker added in v4.7.1

func (p *Plugin) RemoveWorker(ctx context.Context) error

func (*Plugin) Reset

func (p *Plugin) Reset() error

func (*Plugin) Resume

func (p *Plugin) Resume(ctx context.Context, pp string) error

func (*Plugin) Serve

func (p *Plugin) Serve() chan error

func (*Plugin) Status added in v4.4.0

func (p *Plugin) Status() (*status.Status, error)

Status return status of the particular plugin

func (*Plugin) Stop

func (p *Plugin) Stop(ctx context.Context) error

func (*Plugin) Workers

func (p *Plugin) Workers() []*process.State

type Pool

type Pool interface {
	// Workers returns worker list associated with the pool.
	Workers() (workers []*worker.Process)
	// Exec payload
	Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *staticPool.PExec, error)
	// RemoveWorker removes worker from the pool.
	RemoveWorker(ctx context.Context) error
	// AddWorker adds worker to the pool.
	AddWorker() error
	// Reset kill all workers inside the watcher and replaces with new
	Reset(ctx context.Context) error
	// Destroy all underlying stack (but let them to complete the task).
	Destroy(ctx context.Context)
}

type Server

type Server interface {
	NewPool(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger) (*staticPool.Pool, error)
}

Server creates workers for the application.

type Tracer added in v4.3.0

type Tracer interface {
	Tracer() *sdktrace.TracerProvider
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL