Documentation ¶
Index ¶
- Constants
- type Admin
- type Assignment
- type ConsumerBuilder
- type ConsumerConfig
- type ConsumerOffset
- type ConsumerProvider
- type DeliveryReport
- type Error
- type Event
- type GroupConsumer
- type GroupConsumerBuilder
- type GroupConsumerConfig
- type GroupConsumerProvider
- type GroupConsumerStatus
- type GroupMeta
- type GroupSession
- type IsolationLevel
- type Offset
- type OffsetManager
- type OffsetManagerBuilder
- type OffsetManagerConfig
- type Partition
- type PartitionClaim
- type PartitionConf
- type PartitionConsumer
- type PartitionEnd
- type PartitionerFunc
- type PartitionerType
- type Producer
- type ProducerBuilder
- type ProducerConfig
- type ProducerErr
- type ProducerFactory
- type ProducerProvider
- type RebalanceHandler
- type Record
- type RecordContextBinderFunc
- type RecordHeader
- type RecordHeaders
- type RecordMeta
- type RequiredAcks
- type Topic
- type TopicConfig
- type TopicMeta
- type TopicPartition
- type TopicPartitions
- type TransactionalProducer
Constants ¶
View Source
const PartitionAny = -1
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Assignment ¶
type Assignment interface { TPs() TopicPartitions ResetOffset(tp TopicPartition, offset Offset) }
type ConsumerBuilder ¶
type ConsumerBuilder func(func(config *ConsumerConfig)) (PartitionConsumer, error)
type ConsumerConfig ¶
type ConsumerConfig struct { Id string BootstrapServers []string IsolationLevel IsolationLevel TopicMetaFetchTimeout time.Duration EOSEnabled bool MaxPollInterval time.Duration ConsumerMessageChanSize int Logger log.Logger MetricsReporter metrics.Reporter ContextExtractor RecordContextBinderFunc }
func NewPartitionConsumerConfig ¶
func NewPartitionConsumerConfig() *ConsumerConfig
func (*ConsumerConfig) Copy ¶
func (conf *ConsumerConfig) Copy() *ConsumerConfig
type ConsumerOffset ¶
func (*ConsumerOffset) String ¶
func (off *ConsumerOffset) String() string
type ConsumerProvider ¶
type ConsumerProvider interface {
NewBuilder(config *ConsumerConfig) ConsumerBuilder
}
type DeliveryReport ¶
type GroupConsumer ¶
type GroupConsumer interface { // Subscribe subscribes to a list of topic with a user provided RebalanceHandler Subscribe(tps []string, handler RebalanceHandler) error // Unsubscribe signals the consumer to unsubscribe from group Unsubscribe() error Errors() <-chan error }
GroupConsumer is a wrapper for a kafka group consumer adaptor.
type GroupConsumerBuilder ¶
type GroupConsumerBuilder func(func(config *GroupConsumerConfig)) (GroupConsumer, error)
type GroupConsumerConfig ¶
type GroupConsumerConfig struct { *ConsumerConfig GroupId string Offsets struct { Initial Offset Commit struct { Auto bool Interval time.Duration } } }
func NewConfig ¶
func NewConfig() *GroupConsumerConfig
func (*GroupConsumerConfig) Copy ¶
func (conf *GroupConsumerConfig) Copy() *GroupConsumerConfig
type GroupConsumerProvider ¶
type GroupConsumerProvider interface {
NewBuilder(config *GroupConsumerConfig) GroupConsumerBuilder
}
type GroupConsumerStatus ¶
type GroupConsumerStatus string
const ( ConsumerPending GroupConsumerStatus = `Pending` ConsumerRebalancing GroupConsumerStatus = `Rebalancing` ConsumerReady GroupConsumerStatus = `Ready` )
type GroupMeta ¶
type GroupMeta struct {
Meta interface{}
}
GroupMeta wraps consumer group metadata used in transactional producer commits.
type GroupSession ¶
type IsolationLevel ¶
type IsolationLevel int8
const ( ReadUncommitted IsolationLevel = iota ReadCommitted )
type OffsetManager ¶
type OffsetManagerBuilder ¶
type OffsetManagerBuilder func(func(config *OffsetManagerConfig)) (OffsetManager, error)
type OffsetManagerConfig ¶
type OffsetManagerConfig struct { Id string BootstrapServers []string Logger log.Logger MetricsReporter metrics.Reporter }
func NewOffsetManagerConfig ¶
func NewOffsetManagerConfig() *OffsetManagerConfig
type PartitionClaim ¶
type PartitionClaim interface { TopicPartition() TopicPartition Records() <-chan Record }
type PartitionConf ¶
type PartitionConsumer ¶
type PartitionConsumer interface { ConsumeTopic(ctx context.Context, topic string, offset Offset) (map[int32]Partition, error) Partitions(ctx context.Context, topic string) ([]int32, error) ConsumePartition(ctx context.Context, topic string, partition int32, offset Offset) (Partition, error) OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error) GetOffsetLatest(topic string, partition int32) (offset int64, err error) GetOffsetOldest(topic string, partition int32) (offset int64, err error) Close() error }
type PartitionEnd ¶
type PartitionEnd struct {
Tps []TopicPartition
}
func (*PartitionEnd) String ¶
func (p *PartitionEnd) String() string
func (*PartitionEnd) TopicPartitions ¶
func (p *PartitionEnd) TopicPartitions() []TopicPartition
type PartitionerType ¶
type PartitionerType string
type ProducerBuilder ¶
type ProducerBuilder func(conf func(*ProducerConfig)) (Producer, error)
type ProducerConfig ¶
type ProducerConfig struct { Id string BootstrapServers []string PartitionerFunc PartitionerFunc Acks RequiredAcks Transactional struct { Enabled bool Id string } Idempotent bool Logger log.Logger MetricsReporter metrics.Reporter }
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
func (*ProducerConfig) Copy ¶
func (conf *ProducerConfig) Copy() *ProducerConfig
type ProducerErr ¶
type ProducerFactory ¶
type ProducerFactory interface {
NewBuilder(config *GroupConsumerConfig) GroupConsumerBuilder
}
type ProducerProvider ¶
type ProducerProvider interface {
NewBuilder(config *ProducerConfig) ProducerBuilder
}
type RebalanceHandler ¶
type RebalanceHandler interface { OnPartitionRevoked(ctx context.Context, session GroupSession) error OnPartitionAssigned(ctx context.Context, session GroupSession) error OnLost() error Consume(ctx context.Context, session GroupSession, partition PartitionClaim) error }
type RecordContextBinderFunc ¶
type RecordHeader ¶
RecordHeader stores key and value for a record header.
type RecordHeaders ¶
type RecordHeaders []RecordHeader
RecordHeaders are list of key:value pairs.
func (RecordHeaders) Read ¶
func (h RecordHeaders) Read(key []byte) []byte
Read returns a RecordHeader by its name or nil if not exist
type RecordMeta ¶
type RequiredAcks ¶
type RequiredAcks int
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLeader waits for only the local commit to succeed before responding. WaitForLeader RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 )
func (RequiredAcks) String ¶
func (ack RequiredAcks) String() string
type TopicConfig ¶
type TopicMeta ¶
type TopicMeta []TopicPartition
type TopicPartition ¶
TopicPartition represents a kafka topic partition.
func (TopicPartition) String ¶
func (tp TopicPartition) String() string
type TopicPartitions ¶
type TopicPartitions []TopicPartition
func (TopicPartitions) Less ¶
func (list TopicPartitions) Less(i, j int) bool
Less is part of sort.Interface.
func (TopicPartitions) Swap ¶
func (list TopicPartitions) Swap(i, j int)
Swap is part of sort.Interface.
type TransactionalProducer ¶
type TransactionalProducer interface { Producer ProduceAsync(ctx context.Context, record Record) (err error) InitTransactions(ctx context.Context) error BeginTransaction() error SendOffsetsToTransaction(ctx context.Context, offsets []ConsumerOffset, meta *GroupMeta) error CommitTransaction(ctx context.Context) error AbortTransaction(ctx context.Context) error }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.