Documentation ¶
Index ¶
- Constants
- func MessageKeyFrom(ctx context.Context) string
- func TopicPartitionOffsetsFrom(ctx context.Context) []kafka.TopicPartition
- func WithMessageKey(ctx context.Context, messageKey string) context.Context
- func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context
- func WriteProducerMessage(ctx context.Context, in binding.Message, kafkaMsg *kafka.Message, ...) error
- type Message
- func (m *Message) Finish(error) error
- func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
- func (m *Message) GetExtension(name string) interface{}
- func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error
- func (m *Message) ReadEncoding() binding.Encoding
- func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error
- type Option
- func WithConfigMap(config *kafka.ConfigMap) Option
- func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option
- func WithPollTimeout(timeoutMs int) Option
- func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option
- func WithReceiver(consumer *kafka.Consumer) Option
- func WithReceiverTopics(topics []string) Option
- func WithSender(producer *kafka.Producer) Option
- func WithSenderTopic(defaultTopic string) Option
- type Protocol
- func (p *Protocol) Close(ctx context.Context) error
- func (p *Protocol) Events() (chan kafka.Event, error)
- func (p *Protocol) OpenInbound(ctx context.Context) error
- func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)
- func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error)
Constants ¶
const ( KafkaOffsetKey = "kafkaoffset" KafkaPartitionKey = "kafkapartition" KafkaTopicKey = "kafkatopic" KafkaMessageKey = "kafkamessagekey" )
Variables ¶
This section is empty.
Functions ¶
func MessageKeyFrom ¶
MessageKeyFrom looks in the given context and returns `messageKey` as a string if found and valid, otherwise "".
func TopicPartitionOffsetsFrom ¶
func TopicPartitionOffsetsFrom(ctx context.Context) []kafka.TopicPartition
TopicPartitionOffsetsFrom looks in the given context and returns []kafka.TopicPartition or nil if not set
func WithMessageKey ¶
WithMessageKey returns back a new context with the given messageKey.
func WithTopicPartitionOffsets ¶
func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context
WithTopicPartitionOffsets will set the positions where the consumer starts consuming from.
func WriteProducerMessage ¶
func WriteProducerMessage(ctx context.Context, in binding.Message, kafkaMsg *kafka.Message, transformers ...binding.Transformer, ) error
WriteProducerMessage fills the provided pubMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).
Types ¶
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message represents a Kafka message. This message *can* be read several times safely
func NewMessage ¶
NewMessage returns a binding.Message that holds the provided kafka.Message. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func (*Message) GetAttribute ¶
func (*Message) GetExtension ¶
func (*Message) ReadBinary ¶
func (*Message) ReadEncoding ¶
func (*Message) ReadStructured ¶
type Option ¶
Option is the function signature required to be considered an kafka_confluent.Option.
func WithConfigMap ¶
WithConfigMap sets the configMap to init the kafka client.
func WithErrorHandler ¶
WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled.
func WithPollTimeout ¶
WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout.
func WithRebalanceCallBack ¶
func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option
WithRebalanceCallBack sets the callback for rebalancing of the consumer group.
func WithReceiver ¶
WithReceiver set a kafka.Consumer instance to init the client directly.
func WithReceiverTopics ¶
WithReceiverTopics sets the topics for the kafka.Consumer.
func WithSender ¶
WithSender set a kafka.Producer instance to init the client directly.
func WithSenderTopic ¶
WithSenderTopic sets the defaultTopic for the kafka.Producer.
type Protocol ¶
type Protocol struct {
// contains filtered or unexported fields
}
func (*Protocol) Close ¶
Close cleans up resources after use. Must be called to properly close underlying Kafka resources and avoid resource leaks
func (*Protocol) Events ¶
Events returns the events channel used by Confluent Kafka to deliver the result from a produce, i.e., send, operation. When using this SDK to produce (send) messages, this channel must be monitored to avoid resource leaks and this channel becoming full. See Confluent SDK for Go for details on the implementation.