kafka

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*ToKafka) error

func WithLogger

func WithLogger(log *zap.SugaredLogger) Option

type ToKafka

type ToKafka struct {
	// contains filtered or unexported fields
}

ToKafka produce the output to a kafka sinks.

func NewToKafka

func NewToKafka(vertex *dfv1.Vertex,
	fromBuffer isb.BufferReader,
	fetchWatermark fetch.Fetcher,
	publishWatermark map[string]publish.Publisher,
	whereToDecider forward.GoWhere,
	opts ...Option) (*ToKafka, error)

NewToKafka returns ToKafka type.

func (*ToKafka) Close

func (tk *ToKafka) Close() error

func (*ToKafka) ForceStop

func (tk *ToKafka) ForceStop()

ForceStop stops sinking

func (*ToKafka) GetName

func (tk *ToKafka) GetName() string

GetName returns the name.

func (*ToKafka) GetPartitionIdx added in v0.9.0

func (tk *ToKafka) GetPartitionIdx() int32

GetPartitionIdx returns the partition index. for sink it is always 0.

func (*ToKafka) Start

func (tk *ToKafka) Start() <-chan struct{}

Start starts sinking to kafka.

func (*ToKafka) Stop

func (tk *ToKafka) Stop()

Stop stops sinking

func (*ToKafka) Write

func (tk *ToKafka) Write(_ context.Context, messages []isb.Message) ([]isb.Offset, []error)

Write writes to the kafka topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL