Documentation ¶
Index ¶
- type KafkaProducer
- func (kp *KafkaProducer) Close()
- func (kp *KafkaProducer) GetSaramaProducer() sarama.AsyncProducer
- func (kp *KafkaProducer) InitSaramaProducer() error
- func (kp *KafkaProducer) PublishIPFIXMessages(msgCh <-chan *entities.Message)
- func (kp *KafkaProducer) PublishRecord(record entities.Record)
- func (kp *KafkaProducer) SendFlowMessage(msg protoreflect.Message, kafkaDelimitMsgWithLen bool)
- func (kp *KafkaProducer) SetSaramaProducer(producer sarama.AsyncProducer)
- type ProducerInput
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func NewKafkaProducer ¶
func NewKafkaProducer(input ProducerInput) (*KafkaProducer, error)
func (*KafkaProducer) Close ¶
func (kp *KafkaProducer) Close()
func (*KafkaProducer) GetSaramaProducer ¶
func (kp *KafkaProducer) GetSaramaProducer() sarama.AsyncProducer
func (*KafkaProducer) InitSaramaProducer ¶
func (kp *KafkaProducer) InitSaramaProducer() error
func (*KafkaProducer) PublishIPFIXMessages ¶
func (kp *KafkaProducer) PublishIPFIXMessages(msgCh <-chan *entities.Message)
PublishIPFIXMessages takes in a message channel as input and converts all the messages on the message channel to flow messages in proto schema. This function exits when the input message channel is closed.
func (*KafkaProducer) PublishRecord ¶
func (kp *KafkaProducer) PublishRecord(record entities.Record)
func (*KafkaProducer) SendFlowMessage ¶
func (kp *KafkaProducer) SendFlowMessage(msg protoreflect.Message, kafkaDelimitMsgWithLen bool)
SendFlowMessage takes in the flow message in proto schema, encodes it and sends it to on the producer channel. If kafkaDelimitMsgWithLen is set to true, it will return a length-prefixed encoded message.
func (*KafkaProducer) SetSaramaProducer ¶
func (kp *KafkaProducer) SetSaramaProducer(producer sarama.AsyncProducer)
SetSaramaProducer is needed for only tests for setting the sarama producer as mock producer
type ProducerInput ¶
type ProducerInput struct { // KafkaBrokers is a string of addresses of Kafka broker systems KafkaBrokers []string KafkaVersion sarama.KafkaVersion KafkaTopic string KafkaTLSEnabled bool KafkaCAFile string KafkaTLSCertFile string KafkaTLSKeyFile string KafkaTLSSkipVerify bool KafkaLogErrors bool KafkaLogSuccesses bool EnableSaramaDebugLog bool ProtoSchemaConvertor convertor.IPFIXToKafkaConvertor }
Click to show internal directories.
Click to hide internal directories.