producer

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	Writer *gokafka.Writer
	// contains filtered or unexported fields
}

func (*Batch) AddEvents

func (b *Batch) AddEvents(ctx *replication.ListenerContext, messages []gokafka.Message, eventTime time.Time, isLastChunk bool)

func (*Batch) Close

func (b *Batch) Close()

func (*Batch) FlushMessages

func (b *Batch) FlushMessages()

func (*Batch) StartBatchTicker

func (b *Batch) StartBatchTicker()

type Metric

type Metric interface {
	SetProcessLatency(latency int64)
	SetBulkRequestProcessLatency(latency int64)
	PrometheusCollectors() []prometheus.Collector
	IncrementSuccessOp(topicName string)
	IncrementErrOp(topicName string)
}

func NewMetric

func NewMetric(pqCDC cdc.Connector, slotName string) Metric

type Producer

type Producer struct {
	ProducerBatch *Batch
}

func NewProducer

func NewProducer(
	kafkaClient kafka.Client,
	config *config.Connector,
	responseHandler kafka.ResponseHandler,
	pqCDC cdc.Connector,
) (Producer, error)

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) GetMetric

func (p *Producer) GetMetric() Metric

func (*Producer) Produce

func (p *Producer) Produce(
	ctx *replication.ListenerContext,
	eventTime time.Time,
	messages []gokafka.Message,
	isLastChunk bool,
)

func (*Producer) StartBatch

func (p *Producer) StartBatch()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL