queue

package
v0.0.0-...-1fef448 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	BootstrapServers              string
	MessageTopic                  string        // topic for delivering messages
	MarkerTopic                   string        // topic for markers used for message redelivery
	RedeliveryTimeout             time.Duration // time after which messages will be redelivered (maximum 12 hours)
	ConsumerReceiveBufferCapacity uint          // capacity of the receive buffer for queue consumers
	ConsumerNextMessageTimeout    time.Duration // timeout used in the consumer loop to read messages off of Kafka
	ConsumerMaxMessagesPerCommit  uint          // the maximum number of messages consumed before offsets are committed
	RedeliveryTracker             RedeliveryTrackerConfig
}

Config contains the configuration for a Kafka-based queue implementation for task distribution

func NewConfigFromCLI

func NewConfigFromCLI() (Config, error)

NewConfigFromCLI returns the config with a specified CLI command

func NewConfigFromFile

func NewConfigFromFile(filepath string) (Config, error)

NewConfigFromFile returns the config specified from the filepath

type Consumer

type Consumer interface {
	// Close all resources needed to receive messages.
	io.Closer

	// Receive a message from the queue.
	// Returns `nil, nil` if the specified context completes before a message is received from the queue.
	Receive(ctx context.Context) (Message, error)
}

Consumer provides a means to receive messages from a Queue.

type ErrorEvent

type ErrorEvent = kafka.ErrorEvent

ErrorEvent signifies that delivery of a message has failed

type Event

type Event = kafka.Event

Event is a delivery notification event that is returned on the `deliveryCh` provided to Producer.SendAsync (see below)

type Message

type Message interface {

	// Payload returns the message payload
	Payload() []byte

	// Ack notifies that message processing has been completed.
	// This message will never be delivered again.
	Ack() error

	// Nack notifies that message processing has failed.
	// This message will never be delivered again.
	Nack() error

	// Interrupt notifies that message processing has been interrupted.
	// The message will be made visible to other Queue Consumers.
	Interrupt() error
}

Message represents a message received by a Queue Consumer

type MessageDeliveredEvent

type MessageDeliveredEvent = kafka.MessageDeliveredEvent

MessageDeliveredEvent signifies that a message has been delivered

type Producer

type Producer interface {
	// Close all resources needed to send messages.
	io.Closer

	// Send a message to the queue.
	// This is a synchronous, blocking call.
	Send(payload []byte) error

	// Send a message asynchronously to the queue with delivery notifications
	// returned on the specified channel
	SendAsync(payload []byte, deliveryCh chan Event)

	// Flush all messages queued for send and block until either all have been sent
	// or the specified context completes, whichever comes first.
	Flush(ctx context.Context)
}

Producer provides a means to sending messages to a Queue.

type Queue

type Queue interface {
	// Producer returns a new Producer instance that is capable of sending messages to this Queue.
	// The returned Producer must be closed to release resources after it is used.
	Producer() (Producer, error)

	// Consumer returns a new Consumer instance that is capable of receiving messages from this Queue.
	// The returned Consumer must be closed to release resources after it is used.
	Consumer() (Consumer, error)
}

Queue encapsulates the transport of messages sent by Producers and received by Consumers

func NewQueue

func NewQueue(queueID string, config Config, kafkaClientFactory kafka.ClientFactory) Queue

NewQueue allocates a new kafkaQueue instance that can be used to produce and consume messages

type RedeliveryTrackerConfig

type RedeliveryTrackerConfig struct {
	UseNowIfNoMarkerSeen time.Duration // controls when to use the local time of the redelivery tracker to determine what messages require redelivery
	NumOffsetsPerCommit  uint          // the number of markers that can be consumed between offset commits
}

RedeliveryTrackerConfig contain the configuration options for the Kafka-based queue redelivery tracker service

type Service

type Service interface {
	// GetOrCreateQueue returns a `Queue` identified by the specified id.
	GetOrCreateQueue(id string) Queue
}

Service provides a means to create or get a `Queue`

func NewService

func NewService(kafkaClientFactory kafka.ClientFactory, config Config) (Service, error)

NewService returns a Service instance backed by Kafka

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL