Documentation ¶
Index ¶
- func CreateTopic(addr string, topic string, numPartitions, replicationFactor int) error
- func DeleteTopic(addr string, topics ...string) error
- func NewBroker(opts ...broker.Option) broker.Broker
- func WithAllowPublishAutoTopicCreation(enable bool) broker.Option
- func WithAsync(enable bool) broker.Option
- func WithBatchBytes(by int64) broker.Option
- func WithBatchSize(size int) broker.Option
- func WithBatchTimeout(timeout time.Duration) broker.Option
- func WithCommitInterval(interval time.Duration) broker.Option
- func WithCrc32Balancer(consistent bool) broker.PublishOption
- func WithDialer(cfg *kafkaGo.Dialer) broker.Option
- func WithDialerTimeout(tm time.Duration) broker.Option
- func WithEnableErrorLogger(enable bool) broker.Option
- func WithEnableLogger(enable bool) broker.Option
- func WithEnableOneTopicOneWriter(enable bool) broker.Option
- func WithErrorLogger(l kafkaGo.Logger) broker.Option
- func WithHashBalancer(hasher hash.Hash32) broker.PublishOption
- func WithHeaders(headers map[string]interface{}) broker.PublishOption
- func WithHeartbeatInterval(interval time.Duration) broker.Option
- func WithLeastBytesBalancer() broker.PublishOption
- func WithLogger(l kafkaGo.Logger) broker.Option
- func WithMaxAttempts(cnt int) broker.Option
- func WithMaxBytes(bytes int) broker.Option
- func WithMaxWait(time time.Duration) broker.Option
- func WithMessageKey(key []byte) broker.PublishOption
- func WithMessageOffset(offset int64) broker.PublishOption
- func WithMinBytes(bytes int) broker.Option
- func WithMurmur2Balancer(consistent bool) broker.PublishOption
- func WithPartitionWatchInterval(interval time.Duration) broker.Option
- func WithPlainMechanism(username, password string) broker.Option
- func WithPublishMaxAttempts(cnt int) broker.Option
- func WithQueueCapacity(cap int) broker.Option
- func WithReadLagInterval(interval time.Duration) broker.Option
- func WithReadTimeout(timeout time.Duration) broker.Option
- func WithReaderConfig(cfg kafkaGo.ReaderConfig) broker.Option
- func WithRebalanceTimeout(timeout time.Duration) broker.Option
- func WithReferenceHashBalancer(hasher hash.Hash32) broker.PublishOption
- func WithRetentionTime(time time.Duration) broker.Option
- func WithRetries(cnt int) broker.Option
- func WithRoundRobinBalancer() broker.PublishOption
- func WithScramMechanism(algoName ScramAlgorithm, username, password string) broker.Option
- func WithSessionTimeout(timeout time.Duration) broker.Option
- func WithStartOffset(offset int64) broker.Option
- func WithSubscribeAutoCreateTopic(topic string, numPartitions, replicationFactor int) broker.SubscribeOption
- func WithWatchPartitionChanges(enable bool) broker.Option
- func WithWriteTimeout(timeout time.Duration) broker.Option
- func WithWriterConfig(cfg WriterConfig) broker.Option
- type BalancerName
- type ErrorLogger
- type Logger
- type MessageCarrier
- type ScramAlgorithm
- type Writer
- type WriterConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateTopic ¶ added in v2.0.7
func DeleteTopic ¶ added in v2.0.7
func WithAllowPublishAutoTopicCreation ¶ added in v2.0.7
WithAllowPublishAutoTopicCreation .
func WithBatchBytes ¶
WithBatchBytes default:1048576 bytes
func WithBatchSize ¶
WithBatchSize batch.size default:100
func WithBatchTimeout ¶
WithBatchTimeout linger.ms default:10ms
func WithCommitInterval ¶
WithCommitInterval .
func WithCrc32Balancer ¶
func WithCrc32Balancer(consistent bool) broker.PublishOption
WithCrc32Balancer .
func WithEnableErrorLogger ¶
WithEnableErrorLogger enable go-micro error logger
func WithEnableLogger ¶
WithEnableLogger enable go-micro info logger
func WithEnableOneTopicOneWriter ¶
WithEnableOneTopicOneWriter .
func WithErrorLogger ¶
WithErrorLogger inject error logger
func WithHashBalancer ¶
func WithHashBalancer(hasher hash.Hash32) broker.PublishOption
WithHashBalancer .
func WithHeaders ¶
func WithHeaders(headers map[string]interface{}) broker.PublishOption
WithHeaders .
func WithHeartbeatInterval ¶
WithHeartbeatInterval .
func WithLeastBytesBalancer ¶
func WithLeastBytesBalancer() broker.PublishOption
WithLeastBytesBalancer .
func WithMessageOffset ¶
func WithMessageOffset(offset int64) broker.PublishOption
WithMessageOffset .
func WithMurmur2Balancer ¶
func WithMurmur2Balancer(consistent bool) broker.PublishOption
WithMurmur2Balancer .
func WithPartitionWatchInterval ¶
WithPartitionWatchInterval .
func WithPlainMechanism ¶
WithPlainMechanism .
func WithPublishMaxAttempts ¶
WithPublishMaxAttempts .
func WithReadLagInterval ¶
WithReadLagInterval .
func WithReadTimeout ¶
WithReadTimeout default:10s
func WithReaderConfig ¶
func WithReaderConfig(cfg kafkaGo.ReaderConfig) broker.Option
WithReaderConfig .
func WithRebalanceTimeout ¶
WithRebalanceTimeout .
func WithReferenceHashBalancer ¶
func WithReferenceHashBalancer(hasher hash.Hash32) broker.PublishOption
WithReferenceHashBalancer .
func WithRetentionTime ¶
WithRetentionTime .
func WithRoundRobinBalancer ¶
func WithRoundRobinBalancer() broker.PublishOption
WithRoundRobinBalancer .
func WithScramMechanism ¶ added in v2.0.7
func WithScramMechanism(algoName ScramAlgorithm, username, password string) broker.Option
WithScramMechanism .
func WithSessionTimeout ¶
WithSessionTimeout .
func WithSubscribeAutoCreateTopic ¶ added in v2.0.7
func WithSubscribeAutoCreateTopic(topic string, numPartitions, replicationFactor int) broker.SubscribeOption
func WithWatchPartitionChanges ¶
WithWatchPartitionChanges .
func WithWriteTimeout ¶
WithWriteTimeout default:10s
Types ¶
type BalancerName ¶ added in v2.0.7
type BalancerName string
const ( LeastBytesBalancer BalancerName = "LeastBytes" RoundRobinBalancer BalancerName = "RoundRobin" HashBalancer BalancerName = "Hash" ReferenceHashBalancer BalancerName = "ReferenceHash" Crc32Balancer BalancerName = "CRC32Balancer" Murmur2Balancer BalancerName = "Murmur2Balancer" )
type ErrorLogger ¶
type ErrorLogger struct { }
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafkaGo.Message) MessageCarrier
NewMessageCarrier .
type ScramAlgorithm ¶ added in v2.0.7
type ScramAlgorithm string
const ( ScramAlgorithmSHA256 ScramAlgorithm = "SHA256" ScramAlgorithmSHA512 ScramAlgorithm = "SHA512" )
type Writer ¶
type Writer struct { Writer *kafkaGo.Writer Writers map[string]*kafkaGo.Writer EnableOneTopicOneWriter bool }
func (*Writer) Close ¶
func (w *Writer) Close()
Close flushes pending writes, and waits for all writes to complete before returning
func (*Writer) CreateProducer ¶
func (w *Writer) CreateProducer(writerConfig WriterConfig, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) *kafkaGo.Writer
CreateProducer creates kafka-go Writer
type WriterConfig ¶
type WriterConfig struct { // The list of broker addresses used to connect to the kafka cluster. Brokers []string // The balancer used to distribute messages across partitions. // // The default is to use a round-robin distribution. Balancer kafkaGo.Balancer // Limit on how many attempts will be made to deliver a message. // // The default is to try at most 10 times. MaxAttempts int // Limit on how many messages will be buffered before being sent to a // partition. // // The default is to use a target batch size of 100 messages. BatchSize int // Limit the maximum size of a request in bytes before being sent to // a partition. // // The default is to use a kafka default value of 1048576. BatchBytes int64 // Time limit on how often incomplete message batches will be flushed to // kafka. // // The default is to flush at least every second. BatchTimeout time.Duration // Timeout for read operations performed by the Writer. // // Defaults to 10 seconds. ReadTimeout time.Duration // Timeout for write operation performed by the Writer. // // Defaults to 10 seconds. WriteTimeout time.Duration // Number of acknowledges from partition replicas required before receiving // a response to a produce request. The default is -1, which means to wait for // all replicas, and a value above 0 is required to indicate how many replicas // should acknowledge a message to be considered successful. // // This version of kafka-go (v0.3) does not support 0 required acks, due to // some internal complexity implementing this with the Kafka protocol. If you // need that functionality specifically, you'll need to upgrade to v0.4. RequiredAcks kafkaGo.RequiredAcks // Setting this flag to true causes the WriteMessages method to never block. // It also means that errors are ignored since the caller will not receive // the returned value. Use this only if you don't care about guarantees of // whether the messages were written to kafka. Async bool // If not nil, specifies a logger used to report internal changes within the // Writer. Logger kafkaGo.Logger // ErrorLogger is the logger used to report errors. If nil, the Writer falls // back to using Logger instead. ErrorLogger kafkaGo.Logger // AllowAutoTopicCreation notifies Writer to create topic if missing. AllowAutoTopicCreation bool }