Documentation ¶
Overview ¶
Package kafka Date: 2023/11/30 15:02 Author: Amu Description:
Package kafka Date: 2023/11/30 15:03 Author: Amu Description:
Package kafka Date: 2023/11/30 15:05 Author: Amu Description:
Package kafka Date: 2023/11/24 17:35 Author: Amu Description:
Package kafka Date: 2023/11/30 15:01:29 Author: Amu Description:
Index ¶
- Constants
- Variables
- type ByteEncoder
- type Consumer
- type ConsumerMessage
- type Manager
- func (m *Manager) Cleanup(sarama.ConsumerGroupSession) error
- func (m *Manager) Close() error
- func (m *Manager) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error
- func (m *Manager) IsAutoCommit() bool
- func (m *Manager) SendMessage(topic string, key, value sarama.Encoder)
- func (m *Manager) Setup(sarama.ConsumerGroupSession) error
- type Option
- func WithAutoSubmit(auto bool) Option
- func WithClientID(CID string) Option
- func WithConsumerBrokers(brokers []string) Option
- func WithConsumerGroup(group string) Option
- func WithConsumerMessageBufferSize(size int64) Option
- func WithConsumerOffsetInitial(offset int64) Option
- func WithConsumerOffsetsRetryMax(retryMax int) Option
- func WithConsumerTopics(topics []string) Option
- func WithPassword(password string) Option
- func WithProducerBrokers(brokers []string) Option
- func WithProducerRequiredAcks(acks sarama.RequiredAcks) Option
- func WithProducerRetryMax(retryMax int) Option
- func WithUsername(username string) Option
- func WithVersion(version sarama.KafkaVersion) Option
- func WituConsumerRebalanceStrategy(rebalanceStrategy string) Option
- type StringEncoder
Constants ¶
View Source
const ( OffsetNewest = sarama.OffsetNewest OffsetOldest = sarama.OffsetOldest WaitNone = sarama.NoResponse WaitLeader = sarama.WaitForLocal WaitAll = sarama.WaitForAll )
Variables ¶
View Source
var ( V0_8_2_0 = sarama.V0_8_2_0 V0_8_2_1 = sarama.V0_8_2_1 V0_8_2_2 = sarama.V0_8_2_2 V0_9_0_0 = sarama.V0_9_0_0 V0_9_0_1 = sarama.V0_9_0_1 V0_10_0_0 = sarama.V0_10_0_0 V0_10_0_1 = sarama.V0_10_0_1 V0_10_1_0 = sarama.V0_10_1_0 V0_10_1_1 = sarama.V0_10_0_1 V0_10_2_0 = sarama.V0_10_2_0 V0_10_2_1 = sarama.V0_10_2_1 V0_10_2_2 = sarama.V0_10_2_2 V0_11_0_0 = sarama.V0_11_0_0 V0_11_0_1 = sarama.V0_11_0_1 V0_11_0_2 = sarama.V0_11_0_2 V1_0_0_0 = sarama.V1_0_0_0 V1_0_1_0 = sarama.V1_0_1_0 V1_0_2_0 = sarama.V1_0_2_0 V1_1_0_0 = sarama.V1_1_0_0 V1_1_1_0 = sarama.V1_1_1_0 V2_0_0_0 = sarama.V2_0_0_0 V2_0_1_0 = sarama.V2_0_1_0 V2_1_0_0 = sarama.V2_1_0_0 V2_1_1_0 = sarama.V2_1_1_0 V2_2_0_0 = sarama.V2_2_0_0 V2_2_1_0 = sarama.V2_2_1_0 V2_2_2_0 = sarama.V2_2_2_0 V2_3_0_0 = sarama.V2_3_0_0 V2_3_1_0 = sarama.V2_3_1_0 V2_4_0_0 = sarama.V2_4_0_0 V2_4_1_0 = sarama.V2_4_1_0 V2_5_0_0 = sarama.V2_5_0_0 V2_5_1_0 = sarama.V2_5_1_0 V2_6_0_0 = sarama.V2_6_0_0 V2_6_1_0 = sarama.V2_6_1_0 V2_6_2_0 = sarama.V2_6_2_0 V2_7_0_0 = sarama.V2_7_0_0 V2_7_1_0 = sarama.V2_7_1_0 V2_8_0_0 = sarama.V2_8_0_0 V2_8_1_0 = sarama.V2_8_1_0 V2_8_2_0 = sarama.V2_8_2_0 V3_0_0_0 = sarama.V3_0_0_0 V3_0_1_0 = sarama.V3_0_1_0 V3_0_2_0 = sarama.V3_0_2_0 V3_1_0_0 = sarama.V3_1_0_0 V3_1_1_0 = sarama.V3_1_1_0 V3_1_2_0 = sarama.V3_1_2_0 V3_2_0_0 = sarama.V3_2_0_0 V3_2_1_0 = sarama.V3_2_1_0 V3_2_2_0 = sarama.V3_2_2_0 V3_2_3_0 = sarama.V3_2_3_0 V3_3_0_0 = sarama.V3_3_0_0 V3_3_1_0 = sarama.V3_3_1_0 V3_3_2_0 = sarama.V3_3_2_0 V3_4_0_0 = sarama.V3_4_0_0 V3_4_1_0 = sarama.V3_4_1_0 V3_5_0_0 = sarama.V3_5_0_0 V3_5_1_0 = sarama.V3_5_1_0 V3_6_0_0 = sarama.V3_6_0_0 )
Functions ¶
This section is empty.
Types ¶
type ByteEncoder ¶
type ByteEncoder []byte
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
func (ByteEncoder) Length ¶
func (b ByteEncoder) Length() int
type Consumer ¶
type Consumer interface { GetMsg() *sarama.ConsumerMessage Submit() }
type ConsumerMessage ¶
type ConsumerMessage *sarama.ConsumerMessage
type Manager ¶
type Manager struct { Producer sarama.AsyncProducer ConsumerGroup sarama.ConsumerGroup ConsumerMessages chan ConsumerMessage Consumers chan Consumer // contains filtered or unexported fields }
func (*Manager) Cleanup ¶
func (m *Manager) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Manager) ConsumeClaim ¶
func (m *Manager) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error
ConsumeClaim push message
func (*Manager) IsAutoCommit ¶
func (*Manager) SendMessage ¶
SendMessage 发送一条<key,value>到kafka指定topic中
type Option ¶
type Option func(*option)
func WithAutoSubmit ¶
func WithClientID ¶
func WithConsumerBrokers ¶
func WithConsumerGroup ¶
func WithConsumerTopics ¶
func WithPassword ¶
func WithProducerBrokers ¶
func WithProducerRequiredAcks ¶
func WithProducerRequiredAcks(acks sarama.RequiredAcks) Option
func WithProducerRetryMax ¶
func WithUsername ¶
func WithVersion ¶
func WithVersion(version sarama.KafkaVersion) Option
type StringEncoder ¶
type StringEncoder string
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
func (StringEncoder) Length ¶
func (s StringEncoder) Length() int
Click to show internal directories.
Click to hide internal directories.