Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssignedPartition ¶
type AssignedPartition interface { // AddMessage adds a marker message to this partition. // `now` is the local, monotonic clock time AddMessage(msg *confluentKafka.Message, now time.Time) error // SmallestMarkerOffset returns the smallest offset of all marker messages currently being tracked SmallestMarkerOffset() (int64, bool) // Redeliver determines which messages have expired redelivery deadlines and initiates redelivery. // `now` is the local, monotonic clock time Redeliver(now time.Time) // Close releases the resources consumed by this partition instance Close() }
AssignedPartition allows the marker consumer to track consumption progress as well as which messages require redelivery for a specific partition in the marker topic
func NewAssignedPartition ¶
func NewAssignedPartition(markersQueue MarkersQueue, redeliverer Redeliverer, durationUseNowIfNoMarkerSeen time.Duration) AssignedPartition
NewAssignedPartition constructs a new AssignedPartiton instance
type MarkerConsumer ¶
type MarkerConsumer interface { // Start the `MarkerConsumer` Start() // Stop the `MarkerConsumer`, it is unusable after it is stopped Stop() }
MarkerConsumer encapsulates the ability to consume markers from the markers topic and redeliver messages redelivery deadlines have expired
func NewMarkerConsumer ¶
func NewMarkerConsumer(config queue.Config, clients kafka.ClientFactory) (MarkerConsumer, error)
NewMarkerConsumer constructs a new `MarkerConsumer` instance
type MarkerDeadline ¶
type MarkerDeadline struct { MessageID MessageID RedeliveryDeadline time.Time // contains filtered or unexported fields }
MarkerDeadline associates a message with its redelivery deadline
type MarkerDeadlinesPQ ¶
type MarkerDeadlinesPQ struct {
// contains filtered or unexported fields
}
MarkerDeadlinesPQ is a priority queue to track messages by their redelivery deadline in ascending order
func NewMarkerDeadlinesPQ ¶
func NewMarkerDeadlinesPQ() *MarkerDeadlinesPQ
NewMarkerDeadlinesPQ constructs a new `MarkerDeadlinesPQ` instance.
func (*MarkerDeadlinesPQ) Dequeue ¶
func (pq *MarkerDeadlinesPQ) Dequeue() (MarkerDeadline, bool)
Dequeue removes the `MarkerDeadline` item with the earliest redelivery deadline from the head of the priotity queue
func (*MarkerDeadlinesPQ) Enqueue ¶
func (pq *MarkerDeadlinesPQ) Enqueue(markerDeadline MarkerDeadline) bool
Enqueue adds a new `MarkerDeadline` item to the priotity queue
func (*MarkerDeadlinesPQ) Head ¶
func (pq *MarkerDeadlinesPQ) Head() (MarkerDeadline, bool)
Head peeks at the `MarkerDeadline` item with the earliest redelivery deadline at the head of the queue Returns `(MarkerDeadline{}, false)` if the queue is empty.
type MarkerID ¶
type MarkerID = RecordID
MarkerID uniquely identifies a record from the marker topic
func NewMarkerID ¶
NewMarkerID constructs an id that uniquely identifies a marker record.
type MarkerOffset ¶
MarkerOffset associates a message with the location of the marker that tracks it in the maker topic
type MarkerOffsetsPQ ¶
type MarkerOffsetsPQ struct {
// contains filtered or unexported fields
}
MarkerOffsetsPQ is a priority queue to track markers by their offset in the marker topic in ascending order
func NewMarkerOffsetsPQ ¶
func NewMarkerOffsetsPQ() *MarkerOffsetsPQ
NewMarkerOffsetsPQ constructs a new `MarkerOffsetsPQ` instance.
func (*MarkerOffsetsPQ) Dequeue ¶
func (pq *MarkerOffsetsPQ) Dequeue() (MarkerOffset, bool)
Dequeue removes the `MarkerOffset` item with the smallest offset value from the head of the priotity queue
func (*MarkerOffsetsPQ) Enqueue ¶
func (pq *MarkerOffsetsPQ) Enqueue(markerOffset MarkerOffset) bool
Enqueue adds a new `MarkerOffset` item to the priotity queue
func (*MarkerOffsetsPQ) Head ¶
func (pq *MarkerOffsetsPQ) Head() (MarkerOffset, bool)
Head peeks at the `MarkerOffset` item with the smallest offset value at the head of the queue Returns `(MarkerOffset{}, false)` if the queue is empty.
type MarkersQueue ¶
type MarkersQueue interface { // Partition returns the ordinal of the partition that the queue is tracking Partition() int32 // AddMarker adds a marker to the queue. AddMarker(offset int64, timestamp time.Time, marker *kafkamq.Marker) // MarkersToRedelivery returns the markers that require redelivery given the specified timestamp. MarkersToRedeliver(now time.Time) []*kafkamq.Marker // SmallestMarkerOffset returns the smallest marker offset tracked by the queue. // Returns `(0, false)` if the queue is empty. SmallestMarkerOffset() (int64, bool) }
MarkersQueue is the datastructure used to track Markers consumed from the markers topic that determines:
- Which messages currently are in-progress, including their payload.
- Which messages require redelivery (based on marker redelivery deadlines)
- The offset up to which markers have been consumed from the markers topic; this offset is used when committing marker consumer offsets to Kafka.
Note that the MarkersQueue is not thread-safe as it is only meant to be used by a single Kafka consumer thread.
func NewMarkersQueue ¶
func NewMarkersQueue(partition int32) MarkersQueue
NewMarkersQueue constructs a new MarkersQueue instance
type MessageID ¶
type MessageID = RecordID
MessageID uniquely identifies a record from the queue topic
func MessageIDFromMarker ¶
MessageIDFromMarker returns an identifier that uniquely identifies the message record from the queue topic that the specified `marker` tracks.
func NewMessageID ¶
NewMessageID constructs an id that uniquely identifies a message record.
type OffsetCommitter ¶
type OffsetCommitter interface { // AddMarker registers a marker to be committed AddMarker(MarkerID) // DropOffsets drops any cached offsets for the specified partition DropOffsets(partition int32) }
OffsetCommitter allows the redelivery tracker to periodically commit batches of marker offsets to Kafka
func NewOffsetCommitter ¶
func NewOffsetCommitter(markerTopic string, numOffsetsPerCommit uint, kConsumer kafka.Consumer) OffsetCommitter
NewOffsetCommitter constructs a new OffsetCommitter instance that can be used to periodically commit consumer offsets for the specified `markerTopic`. `numOffsetsPerCommit` is the number of offsets that must be added before a commit is performed.
type RecordID ¶
type RecordID struct {
// contains filtered or unexported fields
}
RecordID uniqely identifies a record in a kafka topic by using the record's location (partition:offset)
type Redeliverer ¶
Redeliverer provides functionality to redeliver messages associated with markers whose deadlines have expired.
with the mechanics of message redelivery.
func NewRedeliverer ¶
func NewRedeliverer(partition int32, messageSender internal.MessageSender, markerProducer internal.MarkerProducer, backoffPolicy backoff.BackOff) Redeliverer
NewRedeliverer constructs a `Redeliverer` instance that can be used to redeliver messages associated with markers whose deadlines have expired.