Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsReconnectRequired ¶
IsReconnectRequired returns true if the error specified requires the Kafka client to be reconnected
func IsTimeoutError ¶
IsTimeoutError returns true if the error specified indicates a Kafka client timeout
Types ¶
type MarkerProducer ¶
type MarkerProducer interface { // SendStartMarker synchronously sends a start marker for the specified kafka message and redelivery timeout to the markers topic. // This marks the beginning of processing for the message and sets the redelivery deadline. SendStartMarker(queueID string, msg *confluentKafka.Message, redeliveryTimeout time.Duration) error // SendKeepAliveMarker synchronously sends a keep-alive marker for the specified kafka message and redelivery timeout to the markers topic. // This marks processing of the message as still in progress and refreshes the redelivery deadline. SendKeepAliveMarker(queueID string, msg *confluentKafka.Message, redeliveryTimeout time.Duration) error // SendEndMarker synchronously sends a end marker for the specified kafka message to the markers topic. // This marks the end of processing for the message. SendEndMarker(queueID string, msg *confluentKafka.Message) error // Close tears down the resources associated with this marker producer; it is unusable after this call. Close() error }
MarkerProducer is an interface that supports the sending marker messages to the `MarkersTopic` in support of the kafka-based message queue protocol
func NewMarkerProducer ¶
func NewMarkerProducer(clients kafka.ClientFactory, config MarkerProducerConfig) (MarkerProducer, error)
NewMarkerProducer returns a handle to an open `MarkerProducer` instance.
type MarkerProducerConfig ¶
type MarkerProducerConfig struct { BootstrapServers string // kafka cluster connection information Topic string // the name of the topic that stores markers }
MarkerProducerConfig is a bundle of configuration for a `MarkerProducer` instance
type MessageSender ¶
type MessageSender interface { // SendMessage sends a message containing `payload` to the kafka-based queue with name `queueID` SendMessage(queueID string, payload []byte) error // SendMessageAsync sends a message asynchronously containing `payload` to the kafka-based queue with name `queueID` SendMessageAsync(queueID string, payload []byte, deliveryCh chan kafka.Event) // Flush flushes all pending messages Flush(ctx context.Context) // Close tears down the resources associated with this sender; it is unusable after this call. Close() error }
MessageSender supports sending a message to a specific kafka-based queue It is used by both the `queue.Producer` and `redelivery.Redeliverer` implementations
func NewMessageSender ¶
func NewMessageSender(bootstrapServers string, topic string, kafkaClientFactory kafka.ClientFactory) (MessageSender, error)
NewMessageSender creates a new `Sender` instance that is capable of sending kafka-based queue messages to the specified topic.
type MqClient ¶
type MqClient interface { // NextMessage returns the next message from the queue. NextMessage(timeout time.Duration) (*confluentKafka.Message, error) // ProcessingInProgress indicates that processing of `msg` is in progress by sending a keep-alive marker to the `MarkerTopic` ProcessingInProgress(msg *confluentKafka.Message) error // Processed indicates that processing of `msg` is complete by sending an end marker to the `MarkerTopic` Processed(msg *confluentKafka.Message) error // NeetsReset returns true if this mqclient is tainted and must be reset. // An mqclient becomes tainted when the current consumer offset and committed consumer offset for the message topic // become inconsistent with the markers that have been written to the marker topic. // Note 1: At any point in time it should be possible to reason about the contents of the marker topic by inspecting: // - the current consumer offset // - the committed consumer offset // Note 2: The opposite is also true; we should be able to reason about the contents current offsets in the message // topic by inspecting the contents of the marker topic. NeedsReset() bool // Reset resets this mqclient back to the last message queue offset commited to Kafka // This function clears tainted state by ensuring that: // 1. the next message read from the kafka consumer will be from last committed offset // 2. the next marker written will be for the message read in 1. Reset() error // NeedsReconnect returns true if the underlying kafka consumer has become disconnected // (e.g due to idle timeout expiry) and requires reconnect NeedsReconnect() bool // Reconnect reconnects the underlying kafka consumer Reconnect() error // Close tears down the resources associated with this mqclient; it is unusable after this call. Close() error }
MqClient is an interface for a Kafka-based message queue client.
A call to `NextMessage()` will:
1. read a message from the `MessageTopic` 2. send a start marker for the message to the `MarkerTopic` and wait until they are written 3. commit read offsets of the message `MessageTopic`
The next step of the message flow - 4. processing the messages - should be done by the client.
Optionally, while the message is being processed, the `ProcessingInProgress()` method can be called which will: 5. send a keep-alive marker to the `MarkerTopic`
After the message is processed, the `Processed()` method should be called which will: 6. send an end marker to the `MarkerTopic`
Note that both `ProcessingInProgress()` and `Processed()` can be called at any time for any message and out-of-order.
In the event that message processing fails and should be retried the message must be re-delivered, and neither `ProcessingInProgress()` nor `Processed()` should be called.
func NewMqClient ¶
func NewMqClient(clients kafka.ClientFactory, config MqClientConfig, queueID string) (MqClient, error)
NewMqClient returns a handle to an open `MqClient` instance.
type MqClientConfig ¶
type MqClientConfig struct { BootstrapServers string // kafka cluster connection information MessageTopic string // the name of the topic that stores messages MarkerTopic string // the name of the topic that stores markers RedeliveryTimeout time.Duration // time after which messages should be redelivered MaxMessagesPerCommit uint // the maximum number of messages consumed before offsets are committed }
MqClientConfig is a bundle of configuration for an `MqClient` instance