kafka_confluent

package module
v2.0.0-...-a1fb808 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2025 License: Apache-2.0 Imports: 15 Imported by: 5

Documentation

Index

Constants

View Source
const (
	KafkaOffsetKey    = "kafkaoffset"
	KafkaPartitionKey = "kafkapartition"
	KafkaTopicKey     = "kafkatopic"
	KafkaMessageKey   = "kafkamessagekey"
)

Variables

This section is empty.

Functions

func MessageKeyFrom

func MessageKeyFrom(ctx context.Context) string

MessageKeyFrom looks in the given context and returns `messageKey` as a string if found and valid, otherwise "".

func TopicPartitionOffsetsFrom

func TopicPartitionOffsetsFrom(ctx context.Context) []kafka.TopicPartition

TopicPartitionOffsetsFrom looks in the given context and returns []kafka.TopicPartition or nil if not set

func WithMessageKey

func WithMessageKey(ctx context.Context, messageKey string) context.Context

WithMessageKey returns back a new context with the given messageKey.

func WithTopicPartitionOffsets

func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context

WithTopicPartitionOffsets will set the positions where the consumer starts consuming from.

func WriteProducerMessage

func WriteProducerMessage(ctx context.Context, in binding.Message, kafkaMsg *kafka.Message,
	transformers ...binding.Transformer,
) error

WriteProducerMessage fills the provided pubMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).

Types

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents a Kafka message. This message *can* be read several times safely

func NewMessage

func NewMessage(msg *kafka.Message) *Message

NewMessage returns a binding.Message that holds the provided kafka.Message. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance

func (*Message) Finish

func (m *Message) Finish(error) error

func (*Message) GetAttribute

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type Option

type Option func(*Protocol) error

Option is the function signature required to be considered an kafka_confluent.Option.

func WithConfigMap

func WithConfigMap(config *kafka.ConfigMap) Option

WithConfigMap sets the configMap to init the kafka client.

func WithErrorHandler

func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option

WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled.

func WithPollTimeout

func WithPollTimeout(timeoutMs int) Option

WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout.

func WithRebalanceCallBack

func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option

WithRebalanceCallBack sets the callback for rebalancing of the consumer group.

func WithReceiver

func WithReceiver(consumer *kafka.Consumer) Option

WithReceiver set a kafka.Consumer instance to init the client directly.

func WithReceiverTopics

func WithReceiverTopics(topics []string) Option

WithReceiverTopics sets the topics for the kafka.Consumer.

func WithSender

func WithSender(producer *kafka.Producer) Option

WithSender set a kafka.Producer instance to init the client directly.

func WithSenderTopic

func WithSenderTopic(defaultTopic string) Option

WithSenderTopic sets the defaultTopic for the kafka.Producer.

type Protocol

type Protocol struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...Option) (*Protocol, error)

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) error

Close cleans up resources after use. Must be called to properly close underlying Kafka resources and avoid resource leaks

func (*Protocol) Events

func (p *Protocol) Events() (chan kafka.Event, error)

Events returns the events channel used by Confluent Kafka to deliver the result from a produce, i.e., send, operation. When using this SDK to produce (send) messages, this channel must be monitored to avoid resource leaks and this channel becoming full. See Confluent SDK for Go for details on the implementation.

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error)

Send message by kafka.Producer. You must monitor the Events() channel when using this function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL