kafka

package
v0.0.191 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MakeSaramaKafkaEndpointSink

func MakeSaramaKafkaEndpointSink[T, R any](stream runtime.TypedSinkStream[T, R], partitioner Partitioner[T]) runtime.SinkConsumer[T]

Types

type MessageMetadata

type MessageMetadata struct {
	// contains filtered or unexported fields
}

type Partitioner

type Partitioner[T any] interface {
	Partition(value T, numPartitions int32) (int32, error)
}

type SaramaKafkaDataSink

type SaramaKafkaDataSink struct {
	*runtime.OutputDataSink
	// contains filtered or unexported fields
}

func (*SaramaKafkaDataSink) Partition

func (ds *SaramaKafkaDataSink) Partition(msg *kafka.ProducerMessage, numPartitions int32) (int32, error)

func (*SaramaKafkaDataSink) RequiresConsistency

func (ds *SaramaKafkaDataSink) RequiresConsistency() bool

func (*SaramaKafkaDataSink) SendMessage

func (ds *SaramaKafkaDataSink) SendMessage(msg *kafka.ProducerMessage)

func (*SaramaKafkaDataSink) Start

func (ds *SaramaKafkaDataSink) Start(ctx context.Context) error

func (*SaramaKafkaDataSink) Stop

func (ds *SaramaKafkaDataSink) Stop(ctx context.Context)

type SaramaKafkaEndpoint

type SaramaKafkaEndpoint struct {
	*runtime.DataSinkEndpoint
}

func (*SaramaKafkaEndpoint) SendMessage

func (ep *SaramaKafkaEndpoint) SendMessage(key []byte, value []byte, metadata *MessageMetadata)

func (*SaramaKafkaEndpoint) Start

func (*SaramaKafkaEndpoint) Stop

func (ep *SaramaKafkaEndpoint) Stop(ctx context.Context)

type SaramaKafkaEndpointConsumer

type SaramaKafkaEndpointConsumer interface {
	runtime.OutputEndpointConsumer
	Start(context.Context) error
	Stop(context.Context)
	SendSuccess(msg *kafka.ProducerMessage)
	SendError(errMsg *kafka.ProducerError)
	Partition(message *kafka.ProducerMessage, numPartitions int32) (int32, error)
}

type SaramaKafkaOutputDataSink

type SaramaKafkaOutputDataSink interface {
	runtime.DataSink
	SendMessage(*kafka.ProducerMessage)
}

type SaramaKafkaSinkEndpoint

type SaramaKafkaSinkEndpoint interface {
	runtime.SinkEndpoint
	Start(context.Context, kafka.ClusterAdmin) error
	Stop(context.Context)
	SendMessage(key []byte, value []byte, metadata *MessageMetadata)
}

type TypedSaramaKafkaEndpointConsumer

type TypedSaramaKafkaEndpointConsumer[T, R any] struct {
	*runtime.DataSinkEndpointConsumer[T, R]
	// contains filtered or unexported fields
}

func (*TypedSaramaKafkaEndpointConsumer[T, R]) Consume

func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Consume(value T)

func (*TypedSaramaKafkaEndpointConsumer[T, R]) Partition

func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Partition(message *kafka.ProducerMessage,
	numPartitions int32) (int32, error)

func (*TypedSaramaKafkaEndpointConsumer[T, R]) SendError

func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SendError(errMsg *kafka.ProducerError)

func (*TypedSaramaKafkaEndpointConsumer[T, R]) SendSuccess

func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SendSuccess(msg *kafka.ProducerMessage)

func (*TypedSaramaKafkaEndpointConsumer[T, R]) SetSinkCallback

func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SetSinkCallback(callback runtime.SinkCallback[T])

func (*TypedSaramaKafkaEndpointConsumer[T, R]) Start

func (*TypedSaramaKafkaEndpointConsumer[T, R]) Stop

func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Stop(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL