offsets

package
v0.9.30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2019 License: Apache-2.0 Imports: 7 Imported by: 2

Documentation

Index

Constants

View Source
const OffsetTableName = "consumer_offset"

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerOffset

type ConsumerOffset struct {
	ConsumerGroup  string    `gorm:"column:consumer_group;primary_key"`
	Topic          string    `gorm:"column:topic;primary_key"`
	KafkaPartition int32     `gorm:"column:kafka_partition;primary_key"`
	Offset         int64     `gorm:"column:offset"`
	Ts             time.Time `gorm:"column:ts"`
	Metadata       string    `gorm:"column:metadata"`
}

func (ConsumerOffset) TableName

func (ConsumerOffset) TableName() string

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup           string
	ConsumerGroupGeneration int32  // nothing to do
	ConsumerID              string // nothing to do
	RetentionTime           int64  // nothing to do
	// contains filtered or unexported fields
}

OffsetCommitRequest contains configuration options for requesting commit offset.

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

AddBlock to set commit offset of request block

func (*OffsetCommitRequest) Blocks

type OffsetCommitRequestBlock

type OffsetCommitRequestBlock struct {
	Offset    int64
	Timestamp int64
	Metadata  string
}

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Errors map[string]map[int32]error
}

OffsetCommitResponse is the value for response of commit offset

func SaveOffsetToDB

func SaveOffsetToDB(db *sql.DB, fullTableName string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)

func (*OffsetCommitResponse) AddError

func (r *OffsetCommitResponse) AddError(topic string, partition int32, err error)

AddError to set err for response of commit offset

type OffsetFetchRequest

type OffsetFetchRequest struct {
	ConsumerGroup string
	// contains filtered or unexported fields
}

OffsetFetchRequest contains configuration options for request of fetch offset

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

AddPartition to set partition for request

func (*OffsetFetchRequest) Partitions

func (r *OffsetFetchRequest) Partitions() map[string][]int32

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Blocks     map[string]map[int32]*OffsetFetchResponseBlock
	LastUpdate time.Time //only for db position store
}

OffsetFetchResponse contains configuration options for response of fetch offset

func FetchOffsetFromDB

func FetchOffsetFromDB(db *sql.DB, request *OffsetFetchRequest) (*OffsetFetchResponse, error)

func (*OffsetFetchResponse) AddBlock

func (r *OffsetFetchResponse) AddBlock(topic string, partitionID int32, offset int64, metadata string)

AddBlock to set fetch offset of response block

func (*OffsetFetchResponse) GetBlock

func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock

GetBlock to get fetch offset of response block

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset   int64
	Metadata string
	Err      error
}

OffsetFetchResponseBlock contains configuration options for response of fetch offset block

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL