kafka

package
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*ToKafka) error

func WithLogger

func WithLogger(log *zap.SugaredLogger) Option

type ToKafka

type ToKafka struct {
	// contains filtered or unexported fields
}

ToKafka produce the output to a kafka sinks.

func NewToKafka

func NewToKafka(vertexInstance *dfv1.VertexInstance,
	fromBuffer isb.BufferReader,
	fetchWatermark fetch.Fetcher,
	publishWatermark publish.Publisher,
	idleManager wmb.IdleManager,
	opts ...Option) (*ToKafka, error)

NewToKafka returns ToKafka type.

func (*ToKafka) Close

func (tk *ToKafka) Close() error

func (*ToKafka) ForceStop

func (tk *ToKafka) ForceStop()

ForceStop stops sinking

func (*ToKafka) GetName

func (tk *ToKafka) GetName() string

GetName returns the name.

func (*ToKafka) GetPartitionIdx added in v0.9.0

func (tk *ToKafka) GetPartitionIdx() int32

GetPartitionIdx returns the partition index. for sink it is always 0.

func (*ToKafka) Start

func (tk *ToKafka) Start() <-chan struct{}

Start starts sinking to kafka.

func (*ToKafka) Stop

func (tk *ToKafka) Stop()

Stop stops sinking

func (*ToKafka) Write

func (tk *ToKafka) Write(_ context.Context, messages []isb.Message) ([]isb.Offset, []error)

Write writes to the kafka topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL