Documentation ¶
Overview ¶
Example ¶
package main import ( "fmt" "os" "os/signal" "sync/atomic" "time" "github.com/Shopify/sarama" "github.com/boxgo/box/pkg/client/kafka" ) const ( testTopic = "wechat_event" ) func main() { kfk := kafka.StdConfig("default").Build() producer, err := kfk.NewSyncProducer() if err != nil { panic(err) } defer func() { if err := producer.Close(); err != nil { panic(err) } }() consumer, err := kfk.NewConsumer() if err != nil { panic(err) } defer func() { if err := consumer.Close(); err != nil { panic(err) } }() partitionConsumer, err := consumer.ConsumePartition(testTopic, 0, sarama.OffsetNewest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) var cnt int32 go func() { for { select { case <-partitionConsumer.Messages(): atomic.AddInt32(&cnt, 1) case <-signals: break } } }() partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{ Topic: testTopic, Value: sarama.StringEncoder("hi"), }) if err != nil { panic(err) } time.Sleep(time.Second) fmt.Println(offset >= 0, partition == 0, atomic.LoadInt32(&cnt) > 0) }
Output: true true true
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncSyncProducer ¶
type AsyncSyncProducer sarama.AsyncProducer
type Config ¶
type Config struct { Addrs []string `config:"addrs"` Net Net `config:"net"` Metadata Metadata `config:"metadata"` Producer ProducerConfig `config:"producer"` Consumer ConsumerConfig `config:"consumer"` // contains filtered or unexported fields }
Config 配置
type ConsumerConfig ¶
type ConsumerConfig struct { GroupSessionTimeout time.Duration `config:"groupSessionTimeout"` GroupHeartbeatInterval time.Duration `config:"groupHeartbeatInterval"` GroupRebalanceStrategy sarama.BalanceStrategy `config:"groupRebalanceStrategy"` GroupRebalanceTimeout time.Duration `config:"groupRebalanceTimeout"` GroupRebalanceRetryMax int `config:"groupRebalanceRetryMax"` GroupRebalanceRetryBackoff time.Duration `config:"groupRebalanceRetryBackoff"` GroupMemberUserData []byte `config:"groupMemberUserData"` RetryBackoff time.Duration `config:"retryBackoff"` RetryBackoffFunc func(retries int) time.Duration `config:"-"` FetchMin int32 `config:"fetchMin"` FetchMax int32 `config:"fetchMax"` FetchDefault int32 `config:"fetchDefault"` MaxWaitTime time.Duration `config:"maxWaitTime"` MaxProcessingTime time.Duration `config:"maxProcessingTime"` ReturnErrors bool `config:"returnErrors"` OffsetsCommitInterval time.Duration `config:"offsetsCommitInterval"` OffsetsInitial int64 `config:"offsetsInitial"` OffsetsRetention time.Duration `config:"offsetsRetention"` OffsetRetryMax int `config:"offsetRetryMax"` OffsetAutoCommitEnable bool `config:"offsetAutoCommitEnable"` OffsetAutoCommitInterval time.Duration `config:"offsetAutoCommitInterval"` IsolationLevel sarama.IsolationLevel `config:"isolationLevel"` Interceptors []sarama.ConsumerInterceptor `config:"-"` }
type ConsumerGroup ¶
type ConsumerGroup sarama.ConsumerGroup
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
func (Kafka) NewAsyncProducer ¶
func (kfk Kafka) NewAsyncProducer() (AsyncSyncProducer, error)
func (Kafka) NewConsumer ¶
func (Kafka) NewConsumerGroup ¶
func (kfk Kafka) NewConsumerGroup(groupID string) (ConsumerGroup, error)
func (Kafka) NewSyncProducer ¶
func (kfk Kafka) NewSyncProducer() (SyncProducer, error)
type Metadata ¶
type Metadata struct { RetryMax int `config:"retryMax"` RetryBackoff time.Duration `config:"retryBackoff"` RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"` RefreshFrequency time.Duration `config:"refreshFrequency"` Full bool `config:"full"` Timeout time.Duration `config:"timeout"` }
type ProducerConfig ¶
type ProducerConfig struct { MaxMessageBytes int `config:"maxMessageBytes"` RequiredAcks sarama.RequiredAcks `config:"requiredAcks"` Timeout time.Duration `config:"timeout"` Compression sarama.CompressionCodec `config:"compression"` CompressionLevel int `config:"compressionLevel"` Partitioner sarama.PartitionerConstructor `config:"-"` Idempotent bool `config:"idempotent"` ReturnSuccesses bool `config:"returnSuccesses"` ReturnErrors bool `config:"returnErrors"` FlushBytes int `config:"flushBytes"` FlushMessages int `config:"flushMessages"` FlushFrequency time.Duration `config:"flushFrequency"` FlushMaxMessages int `config:"FlushMaxMessages"` RetryMax int `config:"retryMax"` RetryBackoff time.Duration `config:"retryBackoff"` RetryBackoffFunc func(retries, maxRetries int) time.Duration `config:"-"` Interceptors []sarama.ProducerInterceptor `config:"-"` }
type SyncProducer ¶
type SyncProducer sarama.SyncProducer
Click to show internal directories.
Click to hide internal directories.