producer

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2021 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	KafkaConfigVersion sarama.KafkaVersion
)

Functions

This section is empty.

Types

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func InitKafkaProducer

func InitKafkaProducer(addrs []string, topic string, protoSchema string, logErrors bool) (*KafkaProducer, error)

InitKafkaProducer with broker addresses and other Kafka config parameters.

func NewKafkaProducer

func NewKafkaProducer(asyncProducer sarama.AsyncProducer, topic string, schemaType string) *KafkaProducer

func (*KafkaProducer) Publish

func (kp *KafkaProducer) Publish(msgCh chan *entities.Message)

Publish takes in a message channel as input and converts all the messages on the message channel to flow messages in proto schema. This function exits when the input message channel is closed.

func (*KafkaProducer) SendFlowMessage

func (kp *KafkaProducer) SendFlowMessage(msg *protobuf.FlowMessage, kafkaDelimitMsgWithLen bool)

SendFlowMessage takes in the flow message in proto schema, encodes it and sends it to on the producer channel. If kafkaDelimitMsgWithLen is set to true, it will return a length-prefixed encoded message.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL