Documentation ¶
Index ¶
- Constants
- type CfgOptions
- type Config
- type Configurer
- type Informer
- type Job
- func (j *Job) AutoAck() bool
- func (j *Job) Delay() int64
- func (j *Job) GroupID() string
- func (j *Job) Headers() map[string][]string
- func (j *Job) ID() string
- func (j *Job) Metadata() string
- func (j *Job) Name() string
- func (j *Job) Offset() int64
- func (j *Job) Partition() int32
- func (j *Job) Payload() []byte
- func (j *Job) Priority() int64
- func (j *Job) Topic() string
- func (j *Job) UpdatePriority(p int64)
- type Logger
- type Options
- type Pipeline
- func (p Pipeline) Bool(name string, d bool) bool
- func (p Pipeline) Driver() string
- func (p Pipeline) Get(key string) any
- func (p Pipeline) Has(name string) bool
- func (p Pipeline) Int(name string, d int) int
- func (p Pipeline) Map(name string, out map[string]string) error
- func (p Pipeline) Name() string
- func (p Pipeline) Priority() int64
- func (p Pipeline) String(name string, d string) string
- func (p Pipeline) With(name string, value any)
- type Plugin
- func (p *Plugin) AddWorker() error
- func (p *Plugin) Collects() []*dep.In
- func (p *Plugin) Declare(ctx context.Context, pipeline jobsApi.Pipeline) error
- func (p *Plugin) Destroy(ctx context.Context, pp string) error
- func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error
- func (p *Plugin) JobsState(ctx context.Context) ([]*jobsApi.State, error)
- func (p *Plugin) List() []string
- func (p *Plugin) MetricsCollector() []prometheus.Collector
- func (p *Plugin) Name() string
- func (p *Plugin) Pause(ctx context.Context, pp string) error
- func (p *Plugin) Push(ctx context.Context, j jobsApi.Message) error
- func (p *Plugin) PushBatch(ctx context.Context, j []jobsApi.Message) error
- func (p *Plugin) RPC() any
- func (p *Plugin) Ready() (*status.Status, error)
- func (p *Plugin) RemoveWorker(ctx context.Context) error
- func (p *Plugin) Reset() error
- func (p *Plugin) Resume(ctx context.Context, pp string) error
- func (p *Plugin) Serve() chan error
- func (p *Plugin) Status() (*status.Status, error)
- func (p *Plugin) Stop(ctx context.Context) error
- func (p *Plugin) Workers() []*process.State
- type Pool
- type Server
- type StatsExporter
- type Tracer
Constants ¶
View Source
const ( // RrMode env variable RrMode string = "RR_MODE" RrModeJobs string = "jobs" PluginName string = "jobs" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CfgOptions ¶
type CfgOptions struct { // Parallelism configures the number of pipelines to be started at the same time Parallelism int `mapstructure:"parallelism"` }
type Config ¶
type Config struct { // NumPollers configures number of priority queue pollers // Default - num logical cores NumPollers int `mapstructure:"num_pollers"` // Options contain additional configuration options for the job plugin CfgOptions *CfgOptions `mapstructure:"options"` // PipelineSize is the limit of a main jobs queue which consumes Items from the driver's pipeline // a Driver pipeline might be much larger than a main jobs queue PipelineSize uint64 `mapstructure:"pipeline_size"` // Timeout in seconds is the per-push limit to put the job into the queue Timeout int `mapstructure:"timeout"` // Pool configures roadrunner workers pool. Pool *poolImpl.Config `mapstructure:"pool"` // Pipelines define mapping between PHP job pipeline and associated job broker. Pipelines map[string]Pipeline `mapstructure:"pipelines"` // Consuming specifies names of pipelines to be consumed on service start. Consume []string `mapstructure:"consume"` }
Config defines settings for job broker, workers and job-pipeline mapping.
func (*Config) InitDefaults ¶
func (c *Config) InitDefaults()
type Configurer ¶
type Job ¶
type Job struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` // Ident is unique identifier of the job, should be provided from outside Ident string `json:"id"` // Payload is string data (usually JSON) passed to Job broker. Pld []byte `json:"payload"` // Headers with key-value pairs Hdr map[string][]string `json:"headers"` // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` }
func (*Job) UpdatePriority ¶
type Options ¶
type Options struct { // Priority is job priority, default - 10 // pointer to distinguish 0 as a priority and nil as priority not set Priority int64 `json:"priority"` // Pipeline manually specified pipeline. Pipeline string `json:"pipeline,omitempty"` // Delay defines time duration to delay execution for. Defaults to none. Delay int64 `json:"delay,omitempty"` // AutoAck use to ack a job right after it arrived from the driver AutoAck bool `json:"auto_ack"` // kafka specific fields // Topic is kafka topic Topic string `json:"topic"` // Optional metadata Metadata string `json:"metadata"` // Kafka offset Offset int64 `json:"offset"` // Kafka partition Partition int32 `json:"partition"` }
Options carry information about how to handle given job.
func (*Options) DelayDuration ¶
DelayDuration returns delay duration in a form of time.Duration.
type Pipeline ¶
Pipeline defines pipeline options.
func (Pipeline) Map ¶
Map must return nested map value or empty config. There might be sqs attributes or tags, for example
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
func (*Plugin) MetricsCollector ¶
func (p *Plugin) MetricsCollector() []prometheus.Collector
type Pool ¶
type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []*worker.Process) // Exec payload Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *staticPool.PExec, error) // RemoveWorker removes worker from the pool. RemoveWorker(ctx context.Context) error // AddWorker adds worker to the pool. AddWorker() error // Reset kill all workers inside the watcher and replaces with new Reset(ctx context.Context) error // Destroy all underlying stack (but let them complete the task). Destroy(ctx context.Context) }
type Server ¶
type Server interface {
NewPool(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger) (*staticPool.Pool, error)
}
Server creates workers for the application.
type StatsExporter ¶
type StatsExporter struct { TotalMemoryDesc *prometheus.Desc StateDesc *prometheus.Desc WorkerMemoryDesc *prometheus.Desc TotalWorkersDesc *prometheus.Desc WorkersReady *prometheus.Desc WorkersWorking *prometheus.Desc WorkersInvalid *prometheus.Desc Workers Informer }
func (*StatsExporter) Collect ¶
func (s *StatsExporter) Collect(ch chan<- prometheus.Metric)
func (*StatsExporter) Describe ¶
func (s *StatsExporter) Describe(d chan<- *prometheus.Desc)
type Tracer ¶
type Tracer interface {
Tracer() *sdktrace.TracerProvider
}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.