Documentation
¶
Index ¶
- type BalancerStrategy
- type Client
- func (client *Client) CreateTopic(topic string, replica, partitions int) error
- func (client *Client) NewConsumer(topic string, configs ...*ConsumerConfig) *Consumer
- func (client *Client) NewProducer(topic string, configs ...*ProducerConfig) *Producer
- func (client *Client) Partitions() ([]Partition, error)
- func (client *Client) Topics() ([]string, error)
- type Compression
- type Consumer
- type ConsumerConfig
- func (c *ConsumerConfig) Brokers(input []string) *ConsumerConfig
- func (c *ConsumerConfig) CommitInterval(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) GroupID(input string) *ConsumerConfig
- func (c *ConsumerConfig) GroupTopics(input []string) *ConsumerConfig
- func (c *ConsumerConfig) HeartbeatInterval(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) JoinGroupBackoff(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) MaxAttempts(input int) *ConsumerConfig
- func (c *ConsumerConfig) MaxBytes(input int) *ConsumerConfig
- func (c *ConsumerConfig) MaxWait(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) MinBytes(input int) *ConsumerConfig
- func (c *ConsumerConfig) Partition(input int) *ConsumerConfig
- func (c *ConsumerConfig) PartitionWatchInterval(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) QueueCapacity(input int) *ConsumerConfig
- func (c *ConsumerConfig) ReadBackoffMax(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) ReadBackoffMin(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) ReadLagInterval(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) RebalanceTimeout(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) RetentionTime(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) SessionTimeout(input time.Duration) *ConsumerConfig
- func (c *ConsumerConfig) StartOffset(input int64) *ConsumerConfig
- func (c *ConsumerConfig) Topic(input string) *ConsumerConfig
- func (c *ConsumerConfig) WatchPartitionChanges(input bool) *ConsumerConfig
- type ConsumerHandler
- type Message
- type Partition
- type Producer
- type ProducerConfig
- func (c *ProducerConfig) Ack(input RequiredAcks) *ProducerConfig
- func (c *ProducerConfig) Async(input bool) *ProducerConfig
- func (c *ProducerConfig) Balancer(input BalancerStrategy) *ProducerConfig
- func (c *ProducerConfig) BatchBytes(input int64) *ProducerConfig
- func (c *ProducerConfig) BatchSize(input int) *ProducerConfig
- func (c *ProducerConfig) BatchTimeout(input time.Duration) *ProducerConfig
- func (c *ProducerConfig) Compression(input Compression) *ProducerConfig
- func (c *ProducerConfig) MaxAttempts(input int) *ProducerConfig
- func (c *ProducerConfig) ReadTimeout(input time.Duration) *ProducerConfig
- func (c *ProducerConfig) WriteTimeout(input time.Duration) *ProducerConfig
- type RequiredAcks
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BalancerStrategy ¶
type BalancerStrategy int
const ( MurMur2 BalancerStrategy = 0 CRC32 BalancerStrategy = 1 Hash BalancerStrategy = 2 LeastBytes BalancerStrategy = 3 RoundRobin BalancerStrategy = 4 )
type Client ¶
type Client struct { Brokers []string // contains filtered or unexported fields }
func (*Client) CreateTopic ¶
func (*Client) NewConsumer ¶
func (client *Client) NewConsumer(topic string, configs ...*ConsumerConfig) *Consumer
func (*Client) NewProducer ¶
func (client *Client) NewProducer(topic string, configs ...*ProducerConfig) *Producer
func (*Client) Partitions ¶
type Compression ¶
type Compression = kafka.Compression
const ( Gzip Compression = compress.Gzip Snappy Compression = compress.Snappy Lz4 Compression = compress.Lz4 Zstd Compression = compress.Zstd )
type Consumer ¶
type Consumer struct { Config *kafka.ReaderConfig Reader *kafka.Reader MessageHandlers []ConsumerHandler ErrorHandler func(err error) bool Context context.Context }
func NewConsumer ¶
func NewConsumer(servers, topic string, configs ...*ConsumerConfig) *Consumer
func (*Consumer) OnMessage ¶
func (c *Consumer) OnMessage(fn ConsumerHandler)
type ConsumerConfig ¶
type ConsumerConfig struct {
// contains filtered or unexported fields
}
func NewConsumerConfig ¶
func NewConsumerConfig() *ConsumerConfig
func (*ConsumerConfig) Brokers ¶
func (c *ConsumerConfig) Brokers(input []string) *ConsumerConfig
func (*ConsumerConfig) CommitInterval ¶
func (c *ConsumerConfig) CommitInterval(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) GroupID ¶
func (c *ConsumerConfig) GroupID(input string) *ConsumerConfig
func (*ConsumerConfig) GroupTopics ¶
func (c *ConsumerConfig) GroupTopics(input []string) *ConsumerConfig
func (*ConsumerConfig) HeartbeatInterval ¶
func (c *ConsumerConfig) HeartbeatInterval(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) JoinGroupBackoff ¶
func (c *ConsumerConfig) JoinGroupBackoff(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) MaxAttempts ¶
func (c *ConsumerConfig) MaxAttempts(input int) *ConsumerConfig
func (*ConsumerConfig) MaxBytes ¶
func (c *ConsumerConfig) MaxBytes(input int) *ConsumerConfig
func (*ConsumerConfig) MaxWait ¶
func (c *ConsumerConfig) MaxWait(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) MinBytes ¶
func (c *ConsumerConfig) MinBytes(input int) *ConsumerConfig
func (*ConsumerConfig) Partition ¶
func (c *ConsumerConfig) Partition(input int) *ConsumerConfig
func (*ConsumerConfig) PartitionWatchInterval ¶
func (c *ConsumerConfig) PartitionWatchInterval(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) QueueCapacity ¶
func (c *ConsumerConfig) QueueCapacity(input int) *ConsumerConfig
func (*ConsumerConfig) ReadBackoffMax ¶
func (c *ConsumerConfig) ReadBackoffMax(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) ReadBackoffMin ¶
func (c *ConsumerConfig) ReadBackoffMin(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) ReadLagInterval ¶
func (c *ConsumerConfig) ReadLagInterval(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) RebalanceTimeout ¶
func (c *ConsumerConfig) RebalanceTimeout(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) RetentionTime ¶
func (c *ConsumerConfig) RetentionTime(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) SessionTimeout ¶
func (c *ConsumerConfig) SessionTimeout(input time.Duration) *ConsumerConfig
func (*ConsumerConfig) StartOffset ¶
func (c *ConsumerConfig) StartOffset(input int64) *ConsumerConfig
func (*ConsumerConfig) Topic ¶
func (c *ConsumerConfig) Topic(input string) *ConsumerConfig
func (*ConsumerConfig) WatchPartitionChanges ¶
func (c *ConsumerConfig) WatchPartitionChanges(input bool) *ConsumerConfig
type ConsumerHandler ¶
type ConsumerHandler func(message Message)
type Producer ¶
func NewProducer ¶
func NewProducer(servers, topic string, configs ...*ProducerConfig) *Producer
type ProducerConfig ¶
type ProducerConfig struct {
// contains filtered or unexported fields
}
func NewProducerConfig ¶
func NewProducerConfig() *ProducerConfig
func (*ProducerConfig) Ack ¶
func (c *ProducerConfig) Ack(input RequiredAcks) *ProducerConfig
func (*ProducerConfig) Async ¶
func (c *ProducerConfig) Async(input bool) *ProducerConfig
func (*ProducerConfig) Balancer ¶
func (c *ProducerConfig) Balancer(input BalancerStrategy) *ProducerConfig
func (*ProducerConfig) BatchBytes ¶
func (c *ProducerConfig) BatchBytes(input int64) *ProducerConfig
func (*ProducerConfig) BatchSize ¶
func (c *ProducerConfig) BatchSize(input int) *ProducerConfig
func (*ProducerConfig) BatchTimeout ¶
func (c *ProducerConfig) BatchTimeout(input time.Duration) *ProducerConfig
func (*ProducerConfig) Compression ¶
func (c *ProducerConfig) Compression(input Compression) *ProducerConfig
func (*ProducerConfig) MaxAttempts ¶
func (c *ProducerConfig) MaxAttempts(input int) *ProducerConfig
func (*ProducerConfig) ReadTimeout ¶
func (c *ProducerConfig) ReadTimeout(input time.Duration) *ProducerConfig
func (*ProducerConfig) WriteTimeout ¶
func (c *ProducerConfig) WriteTimeout(input time.Duration) *ProducerConfig
type RequiredAcks ¶
type RequiredAcks kafka.RequiredAcks
const ( RequireNone RequiredAcks = 0 RequireOne RequiredAcks = 1 RequireAll RequiredAcks = -1 )
Click to show internal directories.
Click to hide internal directories.