amqp

package
v2.4.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2021 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Prefetch      int    `mapstructure:"prefetch"`
	Queue         string `mapstructure:"queue"`
	Priority      int64  `mapstructure:"priority"`
	Exchange      string `mapstructure:"exchange"`
	ExchangeType  string `mapstructure:"exchange_type"`
	RoutingKey    string `mapstructure:"routing_key"`
	Exclusive     bool   `mapstructure:"exclusive"`
	MultipleAck   bool   `mapstructure:"multiple_ask"`
	RequeueOnFail bool   `mapstructure:"requeue_on_fail"`
}

Config is used to parse pipeline configuration

func (*Config) InitDefault

func (c *Config) InitDefault()

type GlobalCfg

type GlobalCfg struct {
	Addr string `mapstructure:"addr"`
}

func (*GlobalCfg) InitDefault

func (c *GlobalCfg) InitDefault()

type Item

type Item struct {
	// Job contains pluginName of job broker (usually PHP class).
	Job string `json:"job"`

	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`

	// Payload is string data (usually JSON) passed to Job broker.
	Payload string `json:"payload"`

	// Headers with key-values pairs
	Headers map[string][]string `json:"headers"`

	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Item) Ack

func (i *Item) Ack() error

func (*Item) Body

func (i *Item) Body() []byte

Body packs job payload into binary payload.

func (*Item) Context

func (i *Item) Context() ([]byte, error)

Context packs job context (job, id) into binary payload. Not used in the amqp, amqp.Table used instead

func (*Item) ID

func (i *Item) ID() string

func (*Item) Nack

func (i *Item) Nack() error

func (*Item) Priority

func (i *Item) Priority() int64

func (*Item) Requeue

func (i *Item) Requeue(headers map[string][]string, delay int64) error

Requeue with the provided delay, handled by the Nack

type JobConsumer

type JobConsumer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func FromPipeline

func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error)

func NewAMQPConsumer

func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error)

NewAMQPConsumer initializes rabbitmq pipeline

func (*JobConsumer) Pause

func (j *JobConsumer) Pause(_ context.Context, p string)

func (*JobConsumer) Push

func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error

func (*JobConsumer) Register

func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error

func (*JobConsumer) Resume

func (j *JobConsumer) Resume(_ context.Context, p string)

func (*JobConsumer) Run

func (*JobConsumer) State

func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error)

func (*JobConsumer) Stop

func (j *JobConsumer) Stop(context.Context) error

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`

	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`

	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`
	// contains filtered or unexported fields
}

Options carry information about how to handle given job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Available

func (p *Plugin) Available()

func (*Plugin) FromPipeline

func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error)

FromPipeline constructs AMQP driver from pipeline

func (*Plugin) Init

func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error

func (*Plugin) JobsConstruct

func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error)

func (*Plugin) Name

func (p *Plugin) Name() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL