Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Addresses = flag.String("kafkaaddrs", "localhost:9092", "kafka's comma-separated addresses")
Addresses is the flag for kafka's comma-separated addresses
Topic is the flag for kafka's topic
Functions ¶
Types ¶
type BaseEncoder ¶
type BaseEncoder struct {
// contains filtered or unexported fields
}
BaseEncoder implements MessageEncoder interface and mainly handle monitoring
func NewBaseEncoder ¶
func NewBaseEncoder(typ string) *BaseEncoder
NewBaseEncoder returns a new base MessageEncoder
func (*BaseEncoder) Encode ¶
func (e *BaseEncoder) Encode(message proto.Message) ([]*sarama.ProducerMessage, error)
Encode encodes the proto message to a sarama.ProducerMessage
func (*BaseEncoder) HandleError ¶
func (e *BaseEncoder) HandleError(msg *sarama.ProducerError)
HandleError process the metadata of messages from kafka producer Errors channel
func (*BaseEncoder) HandleSuccess ¶
func (e *BaseEncoder) HandleSuccess(msg *sarama.ProducerMessage)
HandleSuccess process the metadata of messages from kafka producer Successes channel
type MessageEncoder ¶
type MessageEncoder interface { Encode(proto.Message) ([]*sarama.ProducerMessage, error) HandleSuccess(*sarama.ProducerMessage) HandleError(*sarama.ProducerError) }
MessageEncoder is an encoder interface which handles encoding proto.Message to sarama.ProducerMessage
Click to show internal directories.
Click to hide internal directories.