Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
KafkaConfigVersion sarama.KafkaVersion
)
Functions ¶
This section is empty.
Types ¶
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func InitKafkaProducer ¶
func InitKafkaProducer(addrs []string, topic string, protoSchema string, logErrors bool) (*KafkaProducer, error)
InitKafkaProducer with broker addresses and other Kafka config parameters.
func NewKafkaProducer ¶
func NewKafkaProducer(asyncProducer sarama.AsyncProducer, topic string, schemaType string) *KafkaProducer
func (*KafkaProducer) Publish ¶
func (kp *KafkaProducer) Publish(msgCh chan *entities.Message)
Publish takes in a message channel as input and converts all the messages on the message channel to flow messages in proto schema. This function exits when the input message channel is closed.
func (*KafkaProducer) SendFlowMessage ¶
func (kp *KafkaProducer) SendFlowMessage(msg *protobuf.FlowMessage, kafkaDelimitMsgWithLen bool)
SendFlowMessage takes in the flow message in proto schema, encodes it and sends it to on the producer channel. If kafkaDelimitMsgWithLen is set to true, it will return a length-prefixed encoded message.
Click to show internal directories.
Click to hide internal directories.