kafkajobs

package
v3.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2023 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompressionCodec

type CompressionCodec string

type Configurer

type Configurer interface {
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error

	// Has checks if config section exists.
	Has(name string) bool
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func FromPipeline

func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg Configurer, pq priorityqueue.Queue) (*Consumer, error)

FromPipeline initializes pipeline on-the-fly

func NewKafkaConsumer

func NewKafkaConsumer(configKey string, log *zap.Logger, cfg Configurer, pq priorityqueue.Queue) (*Consumer, error)

NewKafkaConsumer initializes kafka pipeline from the configuration

func (*Consumer) Pause

func (c *Consumer) Pause(_ context.Context, p string)

func (*Consumer) Push

func (c *Consumer) Push(ctx context.Context, job *jobs.Job) error

func (*Consumer) Register

func (c *Consumer) Register(_ context.Context, p *pipeline.Pipeline) error

func (*Consumer) Resume

func (c *Consumer) Resume(_ context.Context, p string)

func (*Consumer) Run

func (*Consumer) State

func (c *Consumer) State(context.Context) (*jobs.State, error)

func (*Consumer) Stop

func (c *Consumer) Stop(context.Context) error

type ConsumerOpts

type ConsumerOpts struct {
	MaxFetchMessageSize int32 `mapstructure:"max_fetch_message_size"`
	MinFetchMessageSize int32 `mapstructure:"min_fetch_message_size"`
	SessionTimeout      int   `mapstructure:"session_timeout"`
	HeartbeatInterval   int   `mapstructure:"heartbeat_interval"`
}

type CreateTopics

type CreateTopics struct {
	ReplicationFactor int16              `mapstructure:"replication_factor"`
	ReplicaAssignment map[int32][]int32  `mapstructure:"replica_assignment"`
	ConfigEntries     map[string]*string `mapstructure:"config_entries"`
}

type Item

type Item struct {
	// Job contains pluginName of job broker (usually PHP class).
	Job string `json:"job"`

	// Ident is unique identifier of the job, should be provided from outside
	Ident string `json:"id"`

	// Payload is string data (usually JSON) passed to Job broker.
	Payload string `json:"payload"`

	// Headers with key-values pairs
	Headers map[string][]string `json:"headers"`

	// Options contains set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
}

func (*Item) Ack

func (i *Item) Ack() error

func (*Item) Body

func (i *Item) Body() []byte

Body packs job payload into binary payload.

func (*Item) Context

func (i *Item) Context() ([]byte, error)

Context packs job context (job, id) into binary payload. Not used in the amqp, amqp.Table used instead

func (*Item) Copy

func (i *Item) Copy() *Item

func (*Item) ID

func (i *Item) ID() string

func (*Item) Nack

func (i *Item) Nack() error

func (*Item) Priority

func (i *Item) Priority() int64

func (*Item) Requeue

func (i *Item) Requeue(headers map[string][]string, _ int64) error

Requeue with the provided delay, handled by the Nack

func (*Item) Respond

func (i *Item) Respond(_ []byte, _ string) error

Respond is not used and presented to satisfy the Job interface

type JobKVEncoder

type JobKVEncoder struct {
	// contains filtered or unexported fields
}

func (JobKVEncoder) Encode

func (j JobKVEncoder) Encode() ([]byte, error)

func (JobKVEncoder) Length

func (j JobKVEncoder) Length() int

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as priority not set
	Priority int64 `json:"priority"`

	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`

	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`

	// AutoAck option
	AutoAck bool `json:"auto_ack"`
	// contains filtered or unexported fields
}

Options carry information about how to handle given job.

func (*Options) DelayDuration

func (o *Options) DelayDuration() time.Duration

DelayDuration returns delay duration in a form of time.Duration.

type ProducerOpts

type ProducerOpts struct {
	MaxMessageBytes  int              `mapstructure:"max_message_bytes"`
	RequiredAcks     *int             `mapstructure:"required_acks"`
	Timeout          int              `mapstructure:"timeout"`
	CompressionCodec CompressionCodec `mapstructure:"compression_codec"`
	CompressionLevel *int             `mapstructure:"compression_level"`
	Idempotent       bool             `mapstructure:"idempotent"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL