Documentation ¶
Overview ¶
Package kafka is a kafka client package.
Index ¶
- type AsyncProducer
- type AsyncProducerOption
- func AsyncProducerWithClientID(clientID string) AsyncProducerOption
- func AsyncProducerWithConfig(config *sarama.Config) AsyncProducerOption
- func AsyncProducerWithFlushBytes(flushBytes int) AsyncProducerOption
- func AsyncProducerWithFlushFrequency(flushFrequency time.Duration) AsyncProducerOption
- func AsyncProducerWithFlushMessages(flushMessages int) AsyncProducerOption
- func AsyncProducerWithHandleFailed(handleFailedFn AsyncSendFailedHandlerFn) AsyncProducerOption
- func AsyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) AsyncProducerOption
- func AsyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) AsyncProducerOption
- func AsyncProducerWithReturnSuccesses(returnSuccesses bool) AsyncProducerOption
- func AsyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) AsyncProducerOption
- func AsyncProducerWithVersion(version sarama.KafkaVersion) AsyncProducerOption
- func AsyncProducerWithZapLogger(zapLogger *zap.Logger) AsyncProducerOption
- type AsyncSendFailedHandlerFn
- type Backlog
- type ClientManager
- type Consumer
- type ConsumerGroup
- type ConsumerOption
- func ConsumerWithClientID(clientID string) ConsumerOption
- func ConsumerWithConfig(config *sarama.Config) ConsumerOption
- func ConsumerWithGroupStrategies(groupStrategies ...sarama.BalanceStrategy) ConsumerOption
- func ConsumerWithOffsetsAutoCommitEnable(offsetsAutoCommitEnable bool) ConsumerOption
- func ConsumerWithOffsetsAutoCommitInterval(offsetsAutoCommitInterval time.Duration) ConsumerOption
- func ConsumerWithOffsetsInitial(offsetsInitial int64) ConsumerOption
- func ConsumerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) ConsumerOption
- func ConsumerWithVersion(version sarama.KafkaVersion) ConsumerOption
- func ConsumerWithZapLogger(zapLogger *zap.Logger) ConsumerOption
- type HandleMessageFn
- type Message
- type ProducerMessage
- type SyncProducer
- type SyncProducerOption
- func SyncProducerWithClientID(clientID string) SyncProducerOption
- func SyncProducerWithConfig(config *sarama.Config) SyncProducerOption
- func SyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) SyncProducerOption
- func SyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) SyncProducerOption
- func SyncProducerWithReturnSuccesses(returnSuccesses bool) SyncProducerOption
- func SyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) SyncProducerOption
- func SyncProducerWithVersion(version sarama.KafkaVersion) SyncProducerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { Producer sarama.AsyncProducer // contains filtered or unexported fields }
AsyncProducer is async producer.
func InitAsyncProducer ¶
func InitAsyncProducer(addrs []string, opts ...AsyncProducerOption) (*AsyncProducer, error)
InitAsyncProducer init async producer.
func (*AsyncProducer) SendData ¶
func (p *AsyncProducer) SendData(topic string, multiData ...interface{}) error
SendData sends messages to a topic with multiple types of data.
func (*AsyncProducer) SendMessage ¶
func (p *AsyncProducer) SendMessage(messages ...*sarama.ProducerMessage) error
SendMessage sends messages to a topic.
type AsyncProducerOption ¶
type AsyncProducerOption func(*asyncProducerOptions)
AsyncProducerOption set options.
func AsyncProducerWithClientID ¶
func AsyncProducerWithClientID(clientID string) AsyncProducerOption
AsyncProducerWithClientID set clientID.
func AsyncProducerWithConfig ¶
func AsyncProducerWithConfig(config *sarama.Config) AsyncProducerOption
AsyncProducerWithConfig set custom config.
func AsyncProducerWithFlushBytes ¶
func AsyncProducerWithFlushBytes(flushBytes int) AsyncProducerOption
AsyncProducerWithFlushBytes set flushBytes.
func AsyncProducerWithFlushFrequency ¶
func AsyncProducerWithFlushFrequency(flushFrequency time.Duration) AsyncProducerOption
AsyncProducerWithFlushFrequency set flushFrequency.
func AsyncProducerWithFlushMessages ¶
func AsyncProducerWithFlushMessages(flushMessages int) AsyncProducerOption
AsyncProducerWithFlushMessages set flushMessages.
func AsyncProducerWithHandleFailed ¶
func AsyncProducerWithHandleFailed(handleFailedFn AsyncSendFailedHandlerFn) AsyncProducerOption
AsyncProducerWithHandleFailed set handleFailedFn.
func AsyncProducerWithPartitioner ¶
func AsyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) AsyncProducerOption
AsyncProducerWithPartitioner set partitioner.
func AsyncProducerWithRequiredAcks ¶
func AsyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) AsyncProducerOption
AsyncProducerWithRequiredAcks set requiredAcks.
func AsyncProducerWithReturnSuccesses ¶
func AsyncProducerWithReturnSuccesses(returnSuccesses bool) AsyncProducerOption
AsyncProducerWithReturnSuccesses set returnSuccesses.
func AsyncProducerWithTLS ¶
func AsyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) AsyncProducerOption
AsyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.
func AsyncProducerWithVersion ¶
func AsyncProducerWithVersion(version sarama.KafkaVersion) AsyncProducerOption
AsyncProducerWithVersion set kafka version.
func AsyncProducerWithZapLogger ¶
func AsyncProducerWithZapLogger(zapLogger *zap.Logger) AsyncProducerOption
AsyncProducerWithZapLogger set zapLogger.
type AsyncSendFailedHandlerFn ¶
type AsyncSendFailedHandlerFn func(msg *sarama.ProducerMessage) error
AsyncSendFailedHandlerFn is a function that handles failed messages.
type Backlog ¶
type Backlog struct { Partition int32 `json:"partition"` // partition id Backlog int64 `json:"backlog"` // data backlog NextConsumeOffset int64 `json:"nextOffset"` // offset for next consumption }
Backlog info
type ClientManager ¶
type ClientManager struct {
// contains filtered or unexported fields
}
ClientManager client manager
func InitClientManager ¶
func InitClientManager(addrs []string, groupID string) (*ClientManager, error)
InitClientManager init client manager
func (*ClientManager) GetBacklog ¶
func (m *ClientManager) GetBacklog(topic string) (int64, []*Backlog, error)
GetBacklog get topic backlog
type Consumer ¶
Consumer consume partition
func InitConsumer ¶
func InitConsumer(addrs []string, opts ...ConsumerOption) (*Consumer, error)
InitConsumer init consumer
func (*Consumer) ConsumeAllPartition ¶
func (c *Consumer) ConsumeAllPartition(ctx context.Context, topic string, offset int64, handleFn HandleMessageFn)
ConsumeAllPartition consumer all partitions, no blocking
func (*Consumer) ConsumePartition ¶
func (c *Consumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handleFn HandleMessageFn)
ConsumePartition consumer one partition, blocking
type ConsumerGroup ¶
type ConsumerGroup struct { Group sarama.ConsumerGroup // contains filtered or unexported fields }
ConsumerGroup consume group
func InitConsumerGroup ¶
func InitConsumerGroup(addrs []string, groupID string, opts ...ConsumerOption) (*ConsumerGroup, error)
InitConsumerGroup init consumer group
func (*ConsumerGroup) Close ¶
func (c *ConsumerGroup) Close() error
func (*ConsumerGroup) Consume ¶
func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handleMessageFn HandleMessageFn) error
Consume consume messages
func (*ConsumerGroup) ConsumeCustom ¶
func (c *ConsumerGroup) ConsumeCustom(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
ConsumeCustom consume messages for custom handler, you need to implement the sarama.ConsumerGroupHandler interface
type ConsumerOption ¶
type ConsumerOption func(*consumerOptions)
ConsumerOption set options.
func ConsumerWithClientID ¶
func ConsumerWithClientID(clientID string) ConsumerOption
ConsumerWithClientID set clientID.
func ConsumerWithConfig ¶
func ConsumerWithConfig(config *sarama.Config) ConsumerOption
ConsumerWithConfig set custom config.
func ConsumerWithGroupStrategies ¶
func ConsumerWithGroupStrategies(groupStrategies ...sarama.BalanceStrategy) ConsumerOption
ConsumerWithGroupStrategies set groupStrategies.
func ConsumerWithOffsetsAutoCommitEnable ¶
func ConsumerWithOffsetsAutoCommitEnable(offsetsAutoCommitEnable bool) ConsumerOption
ConsumerWithOffsetsAutoCommitEnable set offsetsAutoCommitEnable.
func ConsumerWithOffsetsAutoCommitInterval ¶
func ConsumerWithOffsetsAutoCommitInterval(offsetsAutoCommitInterval time.Duration) ConsumerOption
ConsumerWithOffsetsAutoCommitInterval set offsetsAutoCommitInterval.
func ConsumerWithOffsetsInitial ¶
func ConsumerWithOffsetsInitial(offsetsInitial int64) ConsumerOption
ConsumerWithOffsetsInitial set offsetsInitial.
func ConsumerWithTLS ¶
func ConsumerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) ConsumerOption
ConsumerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.
func ConsumerWithVersion ¶
func ConsumerWithVersion(version sarama.KafkaVersion) ConsumerOption
ConsumerWithVersion set kafka version.
func ConsumerWithZapLogger ¶
func ConsumerWithZapLogger(zapLogger *zap.Logger) ConsumerOption
ConsumerWithZapLogger set zapLogger.
type HandleMessageFn ¶
type HandleMessageFn func(msg *sarama.ConsumerMessage) error
HandleMessageFn is a function that handles a message from a partition consumer
type Message ¶
type Message struct { Topic string `json:"topic"` Data []byte `json:"data"` Key []byte `json:"key"` }
Message is a message to be sent to a topic.
type ProducerMessage ¶
type ProducerMessage = sarama.ProducerMessage
ProducerMessage is sarama ProducerMessage
type SyncProducer ¶
type SyncProducer struct {
Producer sarama.SyncProducer
}
SyncProducer is a sync producer.
func InitSyncProducer ¶
func InitSyncProducer(addrs []string, opts ...SyncProducerOption) (*SyncProducer, error)
InitSyncProducer init sync producer.
func (*SyncProducer) SendData ¶
func (p *SyncProducer) SendData(topic string, data interface{}) (int32, int64, error)
SendData sends a message to a topic with multiple types of data.
func (*SyncProducer) SendMessage ¶
func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error)
SendMessage sends a message to a topic.
type SyncProducerOption ¶
type SyncProducerOption func(*syncProducerOptions)
SyncProducerOption set options.
func SyncProducerWithClientID ¶
func SyncProducerWithClientID(clientID string) SyncProducerOption
SyncProducerWithClientID set clientID.
func SyncProducerWithConfig ¶
func SyncProducerWithConfig(config *sarama.Config) SyncProducerOption
SyncProducerWithConfig set custom config.
func SyncProducerWithPartitioner ¶
func SyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) SyncProducerOption
SyncProducerWithPartitioner set partitioner.
func SyncProducerWithRequiredAcks ¶
func SyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) SyncProducerOption
SyncProducerWithRequiredAcks set requiredAcks.
func SyncProducerWithReturnSuccesses ¶
func SyncProducerWithReturnSuccesses(returnSuccesses bool) SyncProducerOption
SyncProducerWithReturnSuccesses set returnSuccesses.
func SyncProducerWithTLS ¶
func SyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) SyncProducerOption
SyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.
func SyncProducerWithVersion ¶
func SyncProducerWithVersion(version sarama.KafkaVersion) SyncProducerOption
SyncProducerWithVersion set kafka version.