Documentation ¶
Index ¶
- func MakeSaramaKafkaEndpointSink[T, R any](stream runtime.TypedSinkStream[T, R], partitioner Partitioner[T]) runtime.SinkConsumer[T]
- type MessageMetadata
- type Partitioner
- type SaramaKafkaDataSink
- func (ds *SaramaKafkaDataSink) Partition(msg *kafka.ProducerMessage, numPartitions int32) (int32, error)
- func (ds *SaramaKafkaDataSink) RequiresConsistency() bool
- func (ds *SaramaKafkaDataSink) SendMessage(msg *kafka.ProducerMessage)
- func (ds *SaramaKafkaDataSink) Start(ctx context.Context) error
- func (ds *SaramaKafkaDataSink) Stop(ctx context.Context)
- type SaramaKafkaEndpoint
- type SaramaKafkaEndpointConsumer
- type SaramaKafkaOutputDataSink
- type SaramaKafkaSinkEndpoint
- type TypedSaramaKafkaEndpointConsumer
- func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Consume(value T)
- func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Partition(message *kafka.ProducerMessage, numPartitions int32) (int32, error)
- func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SendError(errMsg *kafka.ProducerError)
- func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SendSuccess(msg *kafka.ProducerMessage)
- func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SetSinkCallback(callback runtime.SinkCallback[T])
- func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Start(ctx context.Context) error
- func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Stop(ctx context.Context)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MakeSaramaKafkaEndpointSink ¶
func MakeSaramaKafkaEndpointSink[T, R any](stream runtime.TypedSinkStream[T, R], partitioner Partitioner[T]) runtime.SinkConsumer[T]
Types ¶
type MessageMetadata ¶
type MessageMetadata struct {
// contains filtered or unexported fields
}
type Partitioner ¶
type SaramaKafkaDataSink ¶
type SaramaKafkaDataSink struct { *runtime.OutputDataSink // contains filtered or unexported fields }
func (*SaramaKafkaDataSink) Partition ¶
func (ds *SaramaKafkaDataSink) Partition(msg *kafka.ProducerMessage, numPartitions int32) (int32, error)
func (*SaramaKafkaDataSink) RequiresConsistency ¶
func (ds *SaramaKafkaDataSink) RequiresConsistency() bool
func (*SaramaKafkaDataSink) SendMessage ¶
func (ds *SaramaKafkaDataSink) SendMessage(msg *kafka.ProducerMessage)
func (*SaramaKafkaDataSink) Stop ¶
func (ds *SaramaKafkaDataSink) Stop(ctx context.Context)
type SaramaKafkaEndpoint ¶
type SaramaKafkaEndpoint struct {
*runtime.DataSinkEndpoint
}
func (*SaramaKafkaEndpoint) SendMessage ¶
func (ep *SaramaKafkaEndpoint) SendMessage(key []byte, value []byte, metadata *MessageMetadata)
func (*SaramaKafkaEndpoint) Start ¶
func (ep *SaramaKafkaEndpoint) Start(ctx context.Context, admin kafka.ClusterAdmin) error
func (*SaramaKafkaEndpoint) Stop ¶
func (ep *SaramaKafkaEndpoint) Stop(ctx context.Context)
type SaramaKafkaEndpointConsumer ¶
type SaramaKafkaEndpointConsumer interface { runtime.OutputEndpointConsumer Start(context.Context) error Stop(context.Context) SendSuccess(msg *kafka.ProducerMessage) SendError(errMsg *kafka.ProducerError) Partition(message *kafka.ProducerMessage, numPartitions int32) (int32, error) }
type SaramaKafkaOutputDataSink ¶
type SaramaKafkaOutputDataSink interface { runtime.DataSink SendMessage(*kafka.ProducerMessage) }
type SaramaKafkaSinkEndpoint ¶
type SaramaKafkaSinkEndpoint interface { runtime.SinkEndpoint Start(context.Context, kafka.ClusterAdmin) error Stop(context.Context) SendMessage(key []byte, value []byte, metadata *MessageMetadata) }
type TypedSaramaKafkaEndpointConsumer ¶
type TypedSaramaKafkaEndpointConsumer[T, R any] struct { *runtime.DataSinkEndpointConsumer[T, R] // contains filtered or unexported fields }
func (*TypedSaramaKafkaEndpointConsumer[T, R]) Consume ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Consume(value T)
func (*TypedSaramaKafkaEndpointConsumer[T, R]) Partition ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Partition(message *kafka.ProducerMessage, numPartitions int32) (int32, error)
func (*TypedSaramaKafkaEndpointConsumer[T, R]) SendError ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SendError(errMsg *kafka.ProducerError)
func (*TypedSaramaKafkaEndpointConsumer[T, R]) SendSuccess ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SendSuccess(msg *kafka.ProducerMessage)
func (*TypedSaramaKafkaEndpointConsumer[T, R]) SetSinkCallback ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) SetSinkCallback(callback runtime.SinkCallback[T])
func (*TypedSaramaKafkaEndpointConsumer[T, R]) Start ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Start(ctx context.Context) error
func (*TypedSaramaKafkaEndpointConsumer[T, R]) Stop ¶
func (ec *TypedSaramaKafkaEndpointConsumer[T, R]) Stop(ctx context.Context)
Click to show internal directories.
Click to hide internal directories.