Documentation ¶
Index ¶
- Variables
- func ParseDSN(dsn string) (zone, cluster, topic string, partitionID int32, err error)
- type Config
- func (c *Config) Ack(ack sarama.RequiredAcks) *Config
- func (c *Config) AsyncMode() *Config
- func (c *Config) DryrunMode() *Config
- func (c *Config) LossTolerant() *Config
- func (c *Config) SetConsumerChanBuffer(size int) *Config
- func (c *Config) SyncMode() *Config
- func (c *Config) ThroughputFirst() *Config
- type Consumer
- type Producer
- type QoS
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotReady = errors.New("not ready") ErrNotAllowed = errors.New("not allowed") ErrStopping = errors.New("stopping") ErrAlreadyClosed = errors.New("consumer already closed") ErrConsumerBroken = errors.New("consumer conn broken") )
var InvalidPartitionID = int32(-1)
Functions ¶
Types ¶
type Config ¶
Config is the configuration of kafka pkg.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig creates a default Config as async producer.
func (*Config) Ack ¶
func (c *Config) Ack(ack sarama.RequiredAcks) *Config
Ack sets the kafka producer required ack parameter.
func (*Config) DryrunMode ¶
DryrunMode switch kafka to dryrun mode.
func (*Config) LossTolerant ¶
LossTolerant set the QoS to loss tolerant.
func (*Config) SetConsumerChanBuffer ¶
SetConsumerChanBuffer set the channel buffer size of consumer.
func (*Config) ThroughputFirst ¶
ThroughputFirst set the QoS to throughput first.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a kafka low level consumer that can consumer multiple zone/cluster kafka clusters.
func NewConsumer ¶
NewConsumer returns a kafka consumer.
func (*Consumer) Messages ¶
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage
type Producer ¶
type Producer struct { // Send will send a kafka message. Send func(*sarama.ProducerMessage) error // contains filtered or unexported fields }
Producer is a uniform kafka producer that is transparent for sync/async mode.
func NewProducer ¶
NewProducer creates a uniform kafka producer.
func (*Producer) SetErrorHandler ¶
func (p *Producer) SetErrorHandler(f func(err *sarama.ProducerError)) error
SetErrorHandler setup the async producer unretriable errors, e.g: ErrInvalidPartition, ErrMessageSizeTooLarge, ErrIncompleteResponse ErrBreakerOpen(e,g. update leader fails). And it is *REQUIRED* for async producer. For sync producer it is not allowed.
func (*Producer) SetSuccessHandler ¶
func (p *Producer) SetSuccessHandler(f func(err *sarama.ProducerMessage)) error
SetSuccessHandler sets the success produced message callback for async producer. And it is *REQUIRED* for async producer. For sync producer it is not allowed.