boltjobs

package
v2.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PluginName string = "boltdb"

	PushBucket    string = "push"
	InQueueBucket string = "processing"
	DelayBucket   string = "delayed"
)

Variables

This section is empty.

Functions

func FromPipeline

func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Configurer, pq priorityqueue.Queue) (*consumer, error)

func NewBoltDBJobs

func NewBoltDBJobs(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer, pq priorityqueue.Queue) (*consumer, error)

Types

type Item

type Item struct {
	// Job contains pluginName of job broker (usually PHP class).
	Job string `json:"job"`

	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`

	// Payload is string data (usually JSON) passed to Job broker.
	Payload string `json:"payload"`

	// Headers with key-values pairs
	Headers map[string][]string `json:"headers"`

	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Item) Ack

func (i *Item) Ack() error

func (*Item) Body

func (i *Item) Body() []byte

func (*Item) Context

func (i *Item) Context() ([]byte, error)

func (*Item) ID

func (i *Item) ID() string

func (*Item) Nack

func (i *Item) Nack() error

func (*Item) Priority

func (i *Item) Priority() int64

func (*Item) Requeue

func (i *Item) Requeue(headers map[string][]string, delay int64) error

Requeue algorithm:

  1. Rewrite item headers and delay.
  2. Begin writable transaction on attached to the item db.
  3. Delete item from the InQueueBucket
  4. Handle items with the delay: 4.1. Get DelayBucket 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format 4.3. Put this key with value to the DelayBucket
  5. W/o delay, put the key with value to the PushBucket (requeue)

func (*Item) Respond

func (i *Item) Respond(_ []byte, _ string) error

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`

	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`

	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`
	// contains filtered or unexported fields
}

Options carry information about how to handle given job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL