Documentation ¶
Index ¶
- func CreateTopic(addr string, topic string, numPartitions, replicationFactor int) error
- func DeleteTopic(addr string, topics ...string) error
- func NewBroker(opts ...broker.Option) broker.Broker
- func WithAllowPublishAutoTopicCreation(enable bool) broker.Option
- func WithAsync(enable bool) broker.Option
- func WithBatchBytes(by int64) broker.Option
- func WithBatchSize(size int) broker.Option
- func WithBatchTimeout(timeout time.Duration) broker.Option
- func WithCommitInterval(interval time.Duration) broker.Option
- func WithCompletion(completion func(messages []kafkaGo.Message, err error)) broker.Option
- func WithCrc32Balancer(consistent bool) broker.PublishOption
- func WithDialer(cfg *kafkaGo.Dialer) broker.Option
- func WithDialerTimeout(tm time.Duration) broker.Option
- func WithEnableErrorLogger(enable bool) broker.Option
- func WithEnableLogger(enable bool) broker.Option
- func WithEnableOneTopicOneWriter(enable bool) broker.Option
- func WithErrorLogger(l kafkaGo.Logger) broker.Option
- func WithHashBalancer(hasher hash.Hash32) broker.PublishOption
- func WithHeaders(headers map[string]interface{}) broker.PublishOption
- func WithHeartbeatInterval(interval time.Duration) broker.Option
- func WithLeastBytesBalancer() broker.PublishOption
- func WithLogger(l kafkaGo.Logger) broker.Option
- func WithMaxAttempts(cnt int) broker.Option
- func WithMaxBytes(bytes int) broker.Option
- func WithMaxWait(time time.Duration) broker.Option
- func WithMessageKey(key []byte) broker.PublishOption
- func WithMessageOffset(offset int64) broker.PublishOption
- func WithMinBytes(bytes int) broker.Option
- func WithMurmur2Balancer(consistent bool) broker.PublishOption
- func WithPartitionWatchInterval(interval time.Duration) broker.Option
- func WithPlainMechanism(username, password string) broker.Option
- func WithPublishMaxAttempts(cnt int) broker.Option
- func WithQueueCapacity(cap int) broker.Option
- func WithReadLagInterval(interval time.Duration) broker.Option
- func WithReadTimeout(timeout time.Duration) broker.Option
- func WithReaderConfig(cfg kafkaGo.ReaderConfig) broker.Option
- func WithRebalanceTimeout(timeout time.Duration) broker.Option
- func WithReferenceHashBalancer(hasher hash.Hash32) broker.PublishOption
- func WithRetentionTime(time time.Duration) broker.Option
- func WithRetries(cnt int) broker.Option
- func WithRoundRobinBalancer() broker.PublishOption
- func WithScramMechanism(algoName ScramAlgorithm, username, password string) broker.Option
- func WithSessionTimeout(timeout time.Duration) broker.Option
- func WithStartOffset(offset int64) broker.Option
- func WithSubscribeAutoCreateTopic(topic string, numPartitions, replicationFactor int) broker.SubscribeOption
- func WithWatchPartitionChanges(enable bool) broker.Option
- func WithWriteTimeout(timeout time.Duration) broker.Option
- func WithWriterConfig(cfg WriterConfig) broker.Option
- type BalancerName
- type ErrorLogger
- type Logger
- type MessageCarrier
- type ScramAlgorithm
- type Writer
- type WriterConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateTopic ¶ added in v1.2.2
func DeleteTopic ¶ added in v1.2.2
func WithAllowPublishAutoTopicCreation ¶ added in v1.2.3
WithAllowPublishAutoTopicCreation .
func WithCommitInterval ¶
WithCommitInterval .
func WithCompletion ¶ added in v1.2.9
WithCompletion 消息发布完成回调
func WithCrc32Balancer ¶ added in v1.1.0
func WithCrc32Balancer(consistent bool) broker.PublishOption
WithCrc32Balancer CRC32负载均衡器
func WithDialerTimeout ¶ added in v1.0.2
WithDialerTimeout .
func WithEnableErrorLogger ¶ added in v1.1.0
WithEnableErrorLogger enable kratos error logger
func WithEnableLogger ¶ added in v1.1.0
WithEnableLogger enable kratos info logger
func WithEnableOneTopicOneWriter ¶ added in v1.1.0
WithEnableOneTopicOneWriter .
func WithErrorLogger ¶ added in v1.1.0
WithErrorLogger inject error logger
func WithHashBalancer ¶ added in v1.1.0
func WithHashBalancer(hasher hash.Hash32) broker.PublishOption
WithHashBalancer Hash负载均衡器
func WithHeaders ¶
func WithHeaders(headers map[string]interface{}) broker.PublishOption
WithHeaders 消息头
func WithHeartbeatInterval ¶
WithHeartbeatInterval .
func WithLeastBytesBalancer ¶ added in v1.1.0
func WithLeastBytesBalancer() broker.PublishOption
WithLeastBytesBalancer LeastBytes负载均衡器
func WithLogger ¶ added in v1.1.0
WithLogger inject info logger
func WithMessageOffset ¶
func WithMessageOffset(offset int64) broker.PublishOption
WithMessageOffset 消息偏移
func WithMurmur2Balancer ¶ added in v1.1.0
func WithMurmur2Balancer(consistent bool) broker.PublishOption
WithMurmur2Balancer Murmur2负载均衡器
func WithPartitionWatchInterval ¶
WithPartitionWatchInterval .
func WithPlainMechanism ¶ added in v1.0.2
WithPlainMechanism PLAIN认证信息
func WithPublishMaxAttempts ¶
WithPublishMaxAttempts .
func WithReadLagInterval ¶
WithReadLagInterval .
func WithReadTimeout ¶
WithReadTimeout 读取超时时间 default:10s
func WithReaderConfig ¶ added in v1.0.2
func WithReaderConfig(cfg kafkaGo.ReaderConfig) broker.Option
WithReaderConfig .
func WithRebalanceTimeout ¶
WithRebalanceTimeout .
func WithReferenceHashBalancer ¶ added in v1.1.0
func WithReferenceHashBalancer(hasher hash.Hash32) broker.PublishOption
WithReferenceHashBalancer ReferenceHash负载均衡器
func WithRetentionTime ¶
WithRetentionTime .
func WithRoundRobinBalancer ¶ added in v1.1.0
func WithRoundRobinBalancer() broker.PublishOption
WithRoundRobinBalancer RoundRobin负载均衡器,默认均衡器。
func WithScramMechanism ¶ added in v1.0.2
func WithScramMechanism(algoName ScramAlgorithm, username, password string) broker.Option
WithScramMechanism SCRAM认证信息
func WithSessionTimeout ¶
WithSessionTimeout .
func WithSubscribeAutoCreateTopic ¶ added in v1.2.3
func WithSubscribeAutoCreateTopic(topic string, numPartitions, replicationFactor int) broker.SubscribeOption
func WithWatchPartitionChanges ¶
WithWatchPartitionChanges .
func WithWriteTimeout ¶
WithWriteTimeout 写入超时时间 default:10s
func WithWriterConfig ¶ added in v1.1.0
func WithWriterConfig(cfg WriterConfig) broker.Option
WithWriterConfig .
Types ¶
type BalancerName ¶ added in v1.2.3
type BalancerName string
const ( LeastBytesBalancer BalancerName = "LeastBytes" RoundRobinBalancer BalancerName = "RoundRobin" HashBalancer BalancerName = "Hash" ReferenceHashBalancer BalancerName = "ReferenceHash" Crc32Balancer BalancerName = "CRC32Balancer" Murmur2Balancer BalancerName = "Murmur2Balancer" )
type ErrorLogger ¶ added in v1.1.0
type ErrorLogger struct { }
func (ErrorLogger) Printf ¶ added in v1.1.0
func (l ErrorLogger) Printf(msg string, args ...interface{})
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafkaGo.Message) MessageCarrier
func (MessageCarrier) Get ¶
func (c MessageCarrier) Get(key string) string
func (MessageCarrier) Keys ¶
func (c MessageCarrier) Keys() []string
func (MessageCarrier) Set ¶
func (c MessageCarrier) Set(key, val string)
type ScramAlgorithm ¶ added in v1.2.3
type ScramAlgorithm string
const ( ScramAlgorithmSHA256 ScramAlgorithm = "SHA256" ScramAlgorithmSHA512 ScramAlgorithm = "SHA512" )
type Writer ¶ added in v1.1.0
type Writer struct { Writer *kafkaGo.Writer Writers map[string]*kafkaGo.Writer EnableOneTopicOneWriter bool }
func (*Writer) CreateProducer ¶ added in v1.1.0
func (w *Writer) CreateProducer(writerConfig WriterConfig, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) *kafkaGo.Writer
CreateProducer create kafka-go Writer
type WriterConfig ¶ added in v1.1.0
type WriterConfig struct { // The list of broker addresses used to connect to the kafka cluster. Brokers []string // The balancer used to distribute messages across partitions. // // The default is to use a round-robin distribution. Balancer kafkaGo.Balancer // Limit on how many attempts will be made to deliver a message. // // The default is to try at most 10 times. MaxAttempts int // Limit on how many messages will be buffered before being sent to a // partition. // // The default is to use a target batch size of 100 messages. BatchSize int // Limit the maximum size of a request in bytes before being sent to // a partition. // // The default is to use a kafka default value of 1048576. BatchBytes int64 // Time limit on how often incomplete message batches will be flushed to // kafka. // // The default is to flush at least every second. BatchTimeout time.Duration // Timeout for read operations performed by the Writer. // // Defaults to 10 seconds. ReadTimeout time.Duration // Timeout for write operation performed by the Writer. // // Defaults to 10 seconds. WriteTimeout time.Duration // Number of acknowledges from partition replicas required before receiving // a response to a produce request. The default is -1, which means to wait for // all replicas, and a value above 0 is required to indicate how many replicas // should acknowledge a message to be considered successful. // // This version of kafka-go (v0.3) does not support 0 required acks, due to // some internal complexity implementing this with the Kafka protocol. If you // need that functionality specifically, you'll need to upgrade to v0.4. RequiredAcks kafkaGo.RequiredAcks // Setting this flag to true causes the WriteMessages method to never block. // It also means that errors are ignored since the caller will not receive // the returned value. Use this only if you don't care about guarantees of // whether the messages were written to kafka. Async bool // If not nil, specifies a logger used to report internal changes within the // Writer. Logger kafkaGo.Logger // ErrorLogger is the logger used to report errors. If nil, the Writer falls // back to using Logger instead. ErrorLogger kafkaGo.Logger // AllowAutoTopicCreation notifies Writer to create topic if missing. AllowAutoTopicCreation bool Completion func(messages []kafkaGo.Message, err error) }