Documentation ¶
Index ¶
- type KafkaConsumer
- type Metrics
- type PartitionStats
- type RecoveryConsumer
- type RecoveryRequest
- type RecoveryRequests
- type RecoveryTracker
- func (rt *RecoveryTracker) AddRecoveryRequest(partitionID int32, fromOffset int64, toOffset int64) error
- func (rt *RecoveryTracker) GetRecoveryRequest(partitionID int32) *RecoveryRequest
- func (rt *RecoveryTracker) MarkRecoveryComplete(partitionID int32, toOffset int64) error
- func (rt *RecoveryTracker) RecoveryRequestCount() int
- func (rt *RecoveryTracker) Shutdown()
- func (rt *RecoveryTracker) UpdateRecoveryRequest(partitionID int32, fromOffset int64, toOffset int64) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaConsumer ¶
type KafkaConsumer struct { fbcontext.ContextAware // contains filtered or unexported fields }
KafkaConsumer is a firebolt source that receives records from a Kafka topic.
func (*KafkaConsumer) GetMetrics ¶
func (k *KafkaConsumer) GetMetrics() *Metrics
GetMetrics returns the instance of ConsumerMetrics used by this kafkaconsumer.
func (*KafkaConsumer) Receive ¶
func (k *KafkaConsumer) Receive(msg fbcontext.Message) error
Receive handles a message from another node or an external source
func (*KafkaConsumer) Setup ¶
Setup instantiates and configures the underlying Kafka consumer client
func (*KafkaConsumer) Shutdown ¶
func (k *KafkaConsumer) Shutdown() error
Shutdown stops the Kafka consumer client
func (*KafkaConsumer) Start ¶
func (k *KafkaConsumer) Start() error
Start subscribes the Kafka consumer client to the configured topic and starts reading records from the consumer's channel
type Metrics ¶
type Metrics struct { EventsConsumed *prometheus.CounterVec StoredOffset *prometheus.GaugeVec LowWatermark *prometheus.GaugeVec HighWatermark *prometheus.GaugeVec ConsumerLag *prometheus.GaugeVec RecoveryEvents *prometheus.CounterVec RecoveryRemaining *prometheus.GaugeVec RecoveryPartitions prometheus.Gauge }
Metrics encapsulates the prometheus collectors used to record metrics about the consumer
func (*Metrics) RegisterConsumerMetrics ¶
func (m *Metrics) RegisterConsumerMetrics()
RegisterConsumerMetrics initializes gauges for tracking consumer state and registers them with the prometheus client
func (*Metrics) UpdateConsumerMetrics ¶
UpdateConsumerMetrics takes the JSON stats reported by the librdkafka consumer and parses it then updates the prometheus gauges.
type PartitionStats ¶
type PartitionStats struct {
// contains filtered or unexported fields
}
PartitionStats is a struct for holding the statistics emitted by the librdkafka consumer that underlies confluent-kafka-go after they are parsed from their original JSON
type RecoveryConsumer ¶
type RecoveryConsumer struct {
// contains filtered or unexported fields
}
RecoveryConsumer is a rate-limited consumer that reprocesses any records that were missed during an outage. Whenever parallel recover is enabled, if KafkaConsumer is initially assigned a partition with high watermark too far ahead (as defined by config 'maxpartitionlag') there will be a gap of missed records between the old stored offset and (highwatermark - maxpartitionlag). KafkaConsumer will request recovery for that partition, those requests are managed by RecoveryTracker. The kafkaconsumer's partition assignments, which are auto-balanced, drive the recoveryconsumer as well. There are two orders of operation for starting recovery for a partition: *Initial Request, Partition Assignment Exists First* kafkaconsumer is assigned a partition, detects a recovery condition, requests recovery, and the call to RequestRecovery() synchronously assigns that partition within RecoveryConsumer *Rebalance Assignment, Recovery Request Exists First* kafkaconsumer is assigned a partition, notifies recoveryconsumer by calling AssignPartitions, fetches the recovery request for the partition from recoverytracker, and assigns the partition within recoveryconsumer
Recovery is rate limited by config 'parallelrecoverymaxrate'; this represents the total number of events per second to receiver; the value should be set to ensure that downstream systems are not overwhelmed during recovery.
There are a few different types of race conditions to worry about here... * Race conditions on consumer rebalance: If one node gets partitions assigned, finds a gap between stored/highwatermark, and creates a request, then a consumer rebalance follows and the same partition assigned to another instance...
- if that happens *before* offsets have been saved by the original assignee, a new request will be created that should match or instead have a slightly higher highwatermark (and thus toOffset), which is fine to overwrite the old recovery request
- if that request happens *after* offsets saved, a new request should not be created because that saved offset is within 'maxpartitionlag', and the existing request is still valid
- Race conditions within recovery, between partition assignment to recoveryconsumer and recoveryrequest availability in
recoverytracker:
- on startup if recoveryrequests already exist, partitions may be assigned in kafkaconsumer before recoverytracker has consumed those existing recoveryrequests, so when recoverytracker *does* consume them recoveryconsumer must update; we use a ticker in recoveryconsumer that refreshes assignments from recoverytracker periodically
What about a case where a recovery request is in progress for an outage, and there's a second outage? Because offsets have been stored, if the second outage was short enough to not generate a new recovery request then the old recovery request will restart. However if the second outage was long enough to generate a new recovery request, it will clobber the old one, and that old in-progress recovery is abandoned. This shortcoming is worth addressing in a future release - we wouldn't want multiple recoveries running at the same time for a single partition, but each recovery request could be an ordered queue of requests, with request offset ranges that overlap automatically merged.
func NewRecoveryConsumer ¶
func NewRecoveryConsumer(topic string, sendCh chan firebolt.Event, config map[string]string, metrics *Metrics, ctx fbcontext.FBContext) (*RecoveryConsumer, error)
NewRecoveryConsumer creates a RecoveryConsumer
func (*RecoveryConsumer) RefreshAssignments ¶
func (rc *RecoveryConsumer) RefreshAssignments() error
RefreshAssignments updates the current partitions that are being recovered when it's known that the set of recoveryrequests managed by RecoveryTracker may have changed.
func (*RecoveryConsumer) RequestRecovery ¶
func (rc *RecoveryConsumer) RequestRecovery(partitionID int32, fromOffset kafka.Offset, toOffset kafka.Offset)
RequestRecovery creates a RecoveryTask to track recovery for a single partition.
func (*RecoveryConsumer) SetAssignedPartitions ¶
func (rc *RecoveryConsumer) SetAssignedPartitions(partitions []kafka.TopicPartition)
SetAssignedPartitions updates the slice of partitions that are currently assigned in kafkaconsumer.
func (*RecoveryConsumer) Shutdown ¶
func (rc *RecoveryConsumer) Shutdown()
Shutdown stops the recovery consumer
type RecoveryRequest ¶
type RecoveryRequest struct { PartitionID int32 `json:"partition_id"` FromOffset int64 `json:"from_offset"` ToOffset int64 `json:"to_offset"` Created time.Time `json:"created"` }
RecoveryRequest is a request for recovery to fill in missed data for a single partition.
type RecoveryRequests ¶
type RecoveryRequests struct {
Requests []*RecoveryRequest `json:"recovery_requests"`
}
RecoveryRequests is the ordered list of active recoveries for a single partition.
type RecoveryTracker ¶
type RecoveryTracker struct {
// contains filtered or unexported fields
}
RecoveryTracker uses a Kafka compact topic to persist requests for partition data recovery. It caches the most recent recoveryRequests
func NewRecoveryTracker ¶
func NewRecoveryTracker(metrics *Metrics, ctx fbcontext.FBContext) (*RecoveryTracker, error)
NewRecoveryTracker creates a RecoveryTracker which uses the messaging framework to manage the recovery process
func (*RecoveryTracker) AddRecoveryRequest ¶
func (rt *RecoveryTracker) AddRecoveryRequest(partitionID int32, fromOffset int64, toOffset int64) error
AddRecoveryRequest creates and persists a new recovery request for the passed partition
func (*RecoveryTracker) GetRecoveryRequest ¶
func (rt *RecoveryTracker) GetRecoveryRequest(partitionID int32) *RecoveryRequest
GetRecoveryRequest returns the most recent RecoveryRequest for the partition passed. It may return 'nil' if either no RecoveryRequest has ever been created for this partition or if the most recent RecoveryRequest for the partition has been marked complete.
func (*RecoveryTracker) MarkRecoveryComplete ¶
func (rt *RecoveryTracker) MarkRecoveryComplete(partitionID int32, toOffset int64) error
MarkRecoveryComplete sends an updated RecoveryRequest message to mark the recovery complete.
func (*RecoveryTracker) RecoveryRequestCount ¶
func (rt *RecoveryTracker) RecoveryRequestCount() int
RecoveryRequestCount returns the number of currently tracked recovery requests.
func (*RecoveryTracker) Shutdown ¶
func (rt *RecoveryTracker) Shutdown()
Shutdown stops the recovery tracker
func (*RecoveryTracker) UpdateRecoveryRequest ¶
func (rt *RecoveryTracker) UpdateRecoveryRequest(partitionID int32, fromOffset int64, toOffset int64) error
UpdateRecoveryRequest persists the passed request to recover skipped data for a partition to a Kafka compact topic.