Documentation ¶
Index ¶
Constants ¶
View Source
const OffsetTableName = "consumer_offset"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerOffset ¶
type ConsumerOffset struct { ConsumerGroup string `gorm:"column:consumer_group;primary_key"` Topic string `gorm:"column:topic;primary_key"` KafkaPartition int32 `gorm:"column:kafka_partition;primary_key"` Offset int64 `gorm:"column:offset"` Ts time.Time `gorm:"column:ts"` Metadata string `gorm:"column:metadata"` }
func (ConsumerOffset) TableName ¶
func (ConsumerOffset) TableName() string
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string ConsumerGroupGeneration int32 // nothing to do ConsumerID string // nothing to do RetentionTime int64 // nothing to do // contains filtered or unexported fields }
OffsetCommitRequest contains configuration options for requesting commit offset.
func (*OffsetCommitRequest) AddBlock ¶
func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)
AddBlock to set commit offset of request block
func (*OffsetCommitRequest) Blocks ¶
func (r *OffsetCommitRequest) Blocks() map[string]map[int32]*OffsetCommitRequestBlock
type OffsetCommitResponse ¶
OffsetCommitResponse is the value for response of commit offset
func SaveOffsetToDB ¶
func SaveOffsetToDB(db *sql.DB, fullTableName string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { ConsumerGroup string // contains filtered or unexported fields }
OffsetFetchRequest contains configuration options for request of fetch offset
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
AddPartition to set partition for request
func (*OffsetFetchRequest) Partitions ¶
func (r *OffsetFetchRequest) Partitions() map[string][]int32
type OffsetFetchResponse ¶
type OffsetFetchResponse struct { Blocks map[string]map[int32]*OffsetFetchResponseBlock LastUpdate time.Time //only for db position store }
OffsetFetchResponse contains configuration options for response of fetch offset
func FetchOffsetFromDB ¶
func FetchOffsetFromDB(db *sql.DB, request *OffsetFetchRequest) (*OffsetFetchResponse, error)
func (*OffsetFetchResponse) AddBlock ¶
func (r *OffsetFetchResponse) AddBlock(topic string, partitionID int32, offset int64, metadata string)
AddBlock to set fetch offset of response block
func (*OffsetFetchResponse) GetBlock ¶
func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock
GetBlock to get fetch offset of response block
type OffsetFetchResponseBlock ¶
OffsetFetchResponseBlock contains configuration options for response of fetch offset block
Click to show internal directories.
Click to hide internal directories.