producer

package
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(input ProducerInput) *KafkaProducer

func (*KafkaProducer) Close added in v0.5.3

func (kp *KafkaProducer) Close()

func (*KafkaProducer) GetSaramaProducer added in v0.5.3

func (kp *KafkaProducer) GetSaramaProducer() sarama.AsyncProducer

func (*KafkaProducer) InitSaramaProducer added in v0.5.3

func (kp *KafkaProducer) InitSaramaProducer() error

func (*KafkaProducer) PublishIPFIXMessages added in v0.5.3

func (kp *KafkaProducer) PublishIPFIXMessages(msgCh chan *entities.Message)

PublishIPFIXMessages takes in a message channel as input and converts all the messages on the message channel to flow messages in proto schema. This function exits when the input message channel is closed.

func (*KafkaProducer) PublishRecord added in v0.5.3

func (kp *KafkaProducer) PublishRecord(record entities.Record)

func (*KafkaProducer) SendFlowMessage

func (kp *KafkaProducer) SendFlowMessage(msg protoreflect.Message, kafkaDelimitMsgWithLen bool)

SendFlowMessage takes in the flow message in proto schema, encodes it and sends it to on the producer channel. If kafkaDelimitMsgWithLen is set to true, it will return a length-prefixed encoded message.

func (*KafkaProducer) SetSaramaProducer added in v0.5.3

func (kp *KafkaProducer) SetSaramaProducer(producer sarama.AsyncProducer)

SetSaramaProducer is needed for only tests for setting the sarama producer as mock producer

type ProducerInput added in v0.5.3

type ProducerInput struct {
	// KafkaBrokers is a string of addresses of Kafka broker systems
	KafkaBrokers         []string
	KafkaVersion         sarama.KafkaVersion
	KafkaTopic           string
	KafkaProtoSchema     string
	KafkaTLSEnabled      bool
	KafkaCAFile          string
	KafkaTLSCertFile     string
	KafkaTLSKeyFile      string
	KafkaTLSSkipVerify   bool
	KafkaLogErrors       bool
	KafkaLogSuccesses    bool
	EnableSaramaDebugLog bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL