Documentation ¶
Overview ¶
Package gxkafka encapsulates some kafka functions based on github.com/Shopify/sarama. MOD : 2016-06-01 05:57
Package gxkafka encapsulates some kafka functions based on github.com/Shopify/sarama. MOD : 2016-06-01 18:00
Package gxkafka encapsulates some kafka functions based on github.com/Shopify/sarama. MOD: 2016-06-01 05:57
Index ¶
Constants ¶
const ( OFFSETS_PROCESSING_TIMEOUT_SECONDS = 10e9 OFFSETS_COMMIT_INTERVAL = 10e9 )
const ( HASH = iota + 1 RANDOM )
Variables ¶
This section is empty.
Functions ¶
func GetBrokerList ¶
Types ¶
type AsyncProducer ¶
type AsyncProducer interface { SendMessage(topic string, key interface{}, message interface{}, metadata interface{}) error SendBytes(topic string, key []byte, message []byte, metadata interface{}) Start() Stop() Terminate() }
Producer is interface for sending messages to Kafka.
func NewAsyncProducer ¶
func NewAsyncProducer( clientID string, brokers []string, partitionMethod int, waitForAllAck bool, updateMetaDataInterval int, compressionType sarama.CompressionCodec, successfulMessageCallback ProducerMessageCallback, errorCallback ProducerErrorCallback, ) (AsyncProducer, error)
NewAsyncProducer constructs a new AsyncProducer for give brokers addresses. @clientID should applied for sarama.validID [sarama config.go:var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)] @updateMetaDataInterval is in second. to keep socket connection alive. its value shoule be less than connections.max.idle.ms. @compressionType pls note that the version of kafka should >= V0_10_0_0 if you wanna use CompressionLZ4.
type Consumer ¶
type Consumer interface { Start() error Commit(*sarama.ConsumerMessage) Stop() }
MessageCallback is a short notation of a callback function for incoming Kafka message.
func NewConsumer ¶
func NewConsumer( clientID string, brokers []string, topicList []string, consumerGroup string, msgCb ConsumerMessageCallback, errCb ConsumerErrorCallback, ntfCb ConsumerNotificationCallback, ) (Consumer, error)
NewConsumer constructs a consumer. @clientID should applied for sarama.validID [sarama config.go:var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)] the following explanation is deprecated.(2017-03-07) NewConsumer 之所以不能直接以brokers当做参数,是因为/wvanderbergen/kafka/consumer用到了consumer group, 各个消费者的信息要存到zk中
type ConsumerErrorCallback ¶
type ConsumerErrorCallback func(error)
Consumer will invoke @ConsumerErrorCallback when got error
type ConsumerMessageCallback ¶
type ConsumerMessageCallback func(msg *sarama.ConsumerMessage, preOffset int64)
Consumer will invoke @ProduceMessageCallback when got message @msg: consumer message @preOffset: @msg's previous message's offset in the same partition.
If @msg is this partition's first message, its preOffset is 0.
type ConsumerNotificationCallback ¶
type ConsumerNotificationCallback func(*sc.Notification)
Consumer will invoke @ConsumerNotification when got notification
type Producer ¶
type Producer interface { SendMessage(topic string, key interface{}, message interface{}) (int32, int64, error) SendBytes(topic string, key []byte, message []byte) (int32, int64, error) Stop() }
Producer is interface for sending messages to Kafka.
func NewProducer ¶
func NewProducer( clientID string, brokers []string, partitionMethod int, waitForAllAck bool, updateMetaDataInterval int, compressionType sarama.CompressionCodec, ) (Producer, error)
NewProducer constructs a new SyncProducer for give brokers addresses. @clientID should applied for sarama.validID [sarama config.go:var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)] @updateMetaDataInterval is in second. to keep socket connection alive. its value shoule be less than connections.max.idle.ms. @compressionType pls note that the version of kafka should >= V0_10_0_0 if you wanna use CompressionLZ4.
type ProducerErrorCallback ¶
type ProducerErrorCallback func(*sarama.ProducerError)
AsyncProducer will invoke @ProduceErrorCallback when got error message response
type ProducerMessageCallback ¶
type ProducerMessageCallback func(*sarama.ProducerMessage)
AsyncProducer will invoke @ProduceMessageCallback when got sucess message response.