internal

package
v0.0.0-...-8f4c080 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2022 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsReconnectRequired

func IsReconnectRequired(err error) bool

IsReconnectRequired returns true if the error specified requires the Kafka client to be reconnected

func IsTimeoutError

func IsTimeoutError(err error) bool

IsTimeoutError returns true if the error specified indicates a Kafka client timeout

Types

type MarkerProducer

type MarkerProducer interface {
	// SendStartMarker synchronously sends a start marker for the specified kafka message and redelivery timeout to the markers topic.
	// This marks the beginning of processing for the message and sets the redelivery deadline.
	SendStartMarker(queueID string, msg *confluentKafka.Message, redeliveryTimeout time.Duration) error

	// SendKeepAliveMarker synchronously sends a keep-alive marker for the specified kafka message and redelivery timeout to the markers topic.
	// This marks processing of the message as still in progress and refreshes the redelivery deadline.
	SendKeepAliveMarker(queueID string, msg *confluentKafka.Message, redeliveryTimeout time.Duration) error

	// SendEndMarker synchronously sends a end marker for the specified kafka message to the markers topic.
	// This marks the end of processing for the message.
	SendEndMarker(queueID string, msg *confluentKafka.Message) error

	// Close tears down the resources associated with this marker producer; it is unusable after this call.
	Close() error
}

MarkerProducer is an interface that supports the sending marker messages to the `MarkersTopic` in support of the kafka-based message queue protocol

func NewMarkerProducer

func NewMarkerProducer(clients kafka.ClientFactory, config MarkerProducerConfig) (MarkerProducer, error)

NewMarkerProducer returns a handle to an open `MarkerProducer` instance.

type MarkerProducerConfig

type MarkerProducerConfig struct {
	BootstrapServers string // kafka cluster connection information
	Topic            string // the name of the topic that stores markers
}

MarkerProducerConfig is a bundle of configuration for a `MarkerProducer` instance

type MessageSender

type MessageSender interface {
	// SendMessage sends a message containing `payload` to the kafka-based queue with name `queueID`
	SendMessage(queueID string, payload []byte) error

	// SendMessageAsync sends a message asynchronously containing `payload` to the kafka-based queue with name `queueID`
	SendMessageAsync(queueID string, payload []byte, deliveryCh chan kafka.Event)

	// Flush flushes all pending messages
	Flush(ctx context.Context)

	// Close tears down the resources associated with this sender; it is unusable after this call.
	Close() error
}

MessageSender supports sending a message to a specific kafka-based queue It is used by both the `queue.Producer` and `redelivery.Redeliverer` implementations

func NewMessageSender

func NewMessageSender(bootstrapServers string, topic string, kafkaClientFactory kafka.ClientFactory) (MessageSender, error)

NewMessageSender creates a new `Sender` instance that is capable of sending kafka-based queue messages to the specified topic.

type MqClient

type MqClient interface {
	// NextMessage returns the next message from the queue.
	NextMessage(timeout time.Duration) (*confluentKafka.Message, error)

	// ProcessingInProgress indicates that processing of `msg` is in progress by sending a keep-alive marker to the `MarkerTopic`
	ProcessingInProgress(msg *confluentKafka.Message) error

	// Processed indicates that processing of `msg` is complete by sending an end marker to the `MarkerTopic`
	Processed(msg *confluentKafka.Message) error

	// NeetsReset returns true if this mqclient is tainted and must be reset.
	// An mqclient becomes tainted when the current consumer offset and committed consumer offset for the message topic
	// become inconsistent with the markers that have been written to the marker topic.
	// Note 1: At any point in time it should be possible to reason about the contents of the marker topic by inspecting:
	// - the current consumer offset
	// - the committed consumer offset
	// Note 2: The opposite is also true; we should be able to reason about the contents current offsets in the message
	//         topic by inspecting the contents of the marker topic.
	NeedsReset() bool

	// Reset resets this mqclient back to the last message queue offset commited to Kafka
	// This function clears tainted state by ensuring that:
	// 1. the next message read from the kafka consumer will be from last committed offset
	// 2. the next marker written will be for the message read in 1.
	Reset() error

	// NeedsReconnect returns true if the underlying kafka consumer has become disconnected
	// (e.g due to idle timeout expiry) and requires reconnect
	NeedsReconnect() bool

	// Reconnect reconnects the underlying kafka consumer
	Reconnect() error

	// Close tears down the resources associated with this mqclient; it is unusable after this call.
	Close() error
}

MqClient is an interface for a Kafka-based message queue client.

A call to `NextMessage()` will:

1. read a message from the `MessageTopic` 2. send a start marker for the message to the `MarkerTopic` and wait until they are written 3. commit read offsets of the message `MessageTopic`

The next step of the message flow - 4. processing the messages - should be done by the client.

Optionally, while the message is being processed, the `ProcessingInProgress()` method can be called which will: 5. send a keep-alive marker to the `MarkerTopic`

After the message is processed, the `Processed()` method should be called which will: 6. send an end marker to the `MarkerTopic`

Note that both `ProcessingInProgress()` and `Processed()` can be called at any time for any message and out-of-order.

In the event that message processing fails and should be retried the message must be re-delivered, and neither `ProcessingInProgress()` nor `Processed()` should be called.

func NewMqClient

func NewMqClient(clients kafka.ClientFactory, config MqClientConfig, queueID string) (MqClient, error)

NewMqClient returns a handle to an open `MqClient` instance.

type MqClientConfig

type MqClientConfig struct {
	BootstrapServers     string        // kafka cluster connection information
	MessageTopic         string        // the name of the topic that stores messages
	MarkerTopic          string        // the name of the topic that stores markers
	RedeliveryTimeout    time.Duration // time after which messages should be redelivered
	MaxMessagesPerCommit uint          // the maximum number of messages consumed before offsets are committed
}

MqClientConfig is a bundle of configuration for an `MqClient` instance

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL