jobs

package module
v4.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2023 License: MIT Imports: 25 Imported by: 1

README

Documentation

Index

Constants

View Source
const (
	// RrMode env variable
	RrMode     string = "RR_MODE"
	RrModeJobs string = "jobs"

	PluginName string = "jobs"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// NumPollers configures number of priority queue pollers
	// Should be no more than 255
	// Default - num logical cores
	NumPollers uint8 `mapstructure:"num_pollers"`

	// PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline
	// Driver pipeline might be much larger than a main jobs queue
	PipelineSize uint64 `mapstructure:"pipeline_size"`

	// Timeout in seconds is the per-push limit to put the job into queue
	Timeout int `mapstructure:"timeout"`

	// Pool configures roadrunner workers pool.
	Pool *poolImpl.Config `mapstructure:"Pool"`

	// Pipelines defines mapping between PHP job pipeline and associated job broker.
	Pipelines map[string]Pipeline `mapstructure:"pipelines"`

	// Consuming specifies names of pipelines to be consumed on service start.
	Consume []string `mapstructure:"consume"`
}

Config defines settings for job broker, workers and job-pipeline mapping.

func (*Config) InitDefaults

func (c *Config) InitDefaults()

type Configurer

type Configurer interface {
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if config section exists.
	Has(name string) bool
}

type Informer

type Informer interface {
	Workers() []*process.State
}

type Job

type Job struct {
	// Job contains name of job broker (usually PHP class).
	Job string `json:"job"`

	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`

	// Payload is string data (usually JSON) passed to Job broker.
	Pld string `json:"payload"`

	// Headers with key-value pairs
	Hdr map[string][]string `json:"headers"`

	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Job) AutoAck

func (j *Job) AutoAck() bool

func (*Job) Delay

func (j *Job) Delay() int64

func (*Job) Headers

func (j *Job) Headers() map[string][]string

func (*Job) ID

func (j *Job) ID() string

func (*Job) Metadata added in v4.1.0

func (j *Job) Metadata() string

func (*Job) Name

func (j *Job) Name() string

func (*Job) Offset added in v4.1.0

func (j *Job) Offset() int64

func (*Job) Partition added in v4.1.0

func (j *Job) Partition() int32

func (*Job) Payload

func (j *Job) Payload() string

func (*Job) Pipeline

func (j *Job) Pipeline() string

func (*Job) Priority

func (j *Job) Priority() int64

func (*Job) Topic added in v4.1.0

func (j *Job) Topic() string

func (*Job) UpdatePriority added in v4.1.0

func (j *Job) UpdatePriority(p int64)

type Logger

type Logger interface {
	NamedLogger(name string) *zap.Logger
}

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`

	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`

	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`

	// AutoAck use to ack a job right after it arrived from the driver
	AutoAck bool `json:"auto_ack"`

	// kafka specific fields
	// Topic is kafka topic
	Topic string `json:"topic"`
	// Optional metadata
	Metadata string `json:"metadata"`
	// Kafka partition
	Partition int32 `json:"partition"`
	// Kafka offset
	Offset int64 `json:"offset"`
}

Options carry information about how to handle given job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

type Pipeline

type Pipeline map[string]any

Pipeline defines pipeline options.

func (Pipeline) Bool

func (p Pipeline) Bool(name string, d bool) bool

Bool must return option value as bool or return default value.

func (Pipeline) Driver

func (p Pipeline) Driver() string

Driver associated with the pipeline.

func (Pipeline) Get

func (p Pipeline) Get(key string) any

Get used to get the data associated with the key

func (Pipeline) Has

func (p Pipeline) Has(name string) bool

Has checks if value presented in pipeline.

func (Pipeline) Int

func (p Pipeline) Int(name string, d int) int

Int must return option value as string or return default value.

func (Pipeline) Map

func (p Pipeline) Map(name string, out map[string]string) error

Map must return nested map value or empty config. Here might be sqs attributes or tags for example

func (Pipeline) Name

func (p Pipeline) Name() string

Name returns pipeline name.

func (Pipeline) Priority

func (p Pipeline) Priority() int64

Priority returns default pipeline priority

func (Pipeline) String

func (p Pipeline) String(name string, d string) string

String must return option value as string or return default value.

func (Pipeline) With

func (p Pipeline) With(name string, value any)

With pipeline value

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Collects

func (p *Plugin) Collects() []*dep.In

func (*Plugin) Declare

func (p *Plugin) Declare(pipeline jobsApi.Pipeline) error

Declare a pipeline.

func (*Plugin) Destroy

func (p *Plugin) Destroy(pp string) error

Destroy pipeline and release all associated resources.

func (*Plugin) Init

func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error

func (*Plugin) JobsState

func (p *Plugin) JobsState(ctx context.Context) ([]*jobsApi.State, error)

func (*Plugin) List

func (p *Plugin) List() []string

func (*Plugin) MetricsCollector

func (p *Plugin) MetricsCollector() []prometheus.Collector

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Pause

func (p *Plugin) Pause(pp string) error

func (*Plugin) Push

func (p *Plugin) Push(j jobsApi.Job) error

func (*Plugin) PushBatch

func (p *Plugin) PushBatch(j []jobsApi.Job) error

func (*Plugin) RPC

func (p *Plugin) RPC() any

func (*Plugin) Reset

func (p *Plugin) Reset() error

func (*Plugin) Resume

func (p *Plugin) Resume(pp string) error

func (*Plugin) Serve

func (p *Plugin) Serve() chan error

func (*Plugin) Stop

func (p *Plugin) Stop(context.Context) error

func (*Plugin) Workers

func (p *Plugin) Workers() []*process.State

type Pool

type Pool interface {
	// Workers returns worker list associated with the pool.
	Workers() (workers []*worker.Process)
	// Exec payload
	Exec(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
	// Reset kill all workers inside the watcher and replaces with new
	Reset(ctx context.Context) error
	// Destroy all underlying stack (but let them to complete the task).
	Destroy(ctx context.Context)
}

type Server

type Server interface {
	NewPool(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger) (*staticPool.Pool, error)
}

Server creates workers for the application.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL