Documentation ¶
Overview ¶
Package kafka implements the Kafka CloudEvents binding.
Index ¶
- Constants
- func EncodeKafkaProducerMessage(ctx context.Context, m binding.Message, ...) error
- func NewMessage(cm *sarama.ConsumerMessage) (binding.Message, error)
- func NewMessageFromRaw(key []byte, value []byte, contentType string, headers map[string][]byte) (binding.Message, error)
- func WithSkipKeyExtension(ctx context.Context) context.Context
- type Message
- type Receiver
- func (r *Receiver) Cleanup(sarama.ConsumerGroupSession) error
- func (r *Receiver) Close(ctx context.Context) error
- func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)
- func (r *Receiver) Setup(sess sarama.ConsumerGroupSession) error
- type Sender
- type SenderOptionFunc
Constants ¶
View Source
const ContentType = "content-type"
View Source
const (
SKIP_KEY_EXTENSION = "SKIP_KEY_EXTENSION"
)
Variables ¶
This section is empty.
Functions ¶
func EncodeKafkaProducerMessage ¶
func EncodeKafkaProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformerFactories binding.TransformerFactories) error
Fill the provided producerMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Translate documentation).
func NewMessage ¶
func NewMessage(cm *sarama.ConsumerMessage) (binding.Message, error)
NewMessage returns a Message with data from body. Reads and closes body.
func NewMessageFromRaw ¶
Types ¶
type Message ¶
type Message struct {
Key, Value []byte
Headers map[string][]byte
ContentType string
// contains filtered or unexported fields
}
Message holds a sarama ConsumerMessage.
func (*Message) Structured ¶
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
func (*Receiver) ConsumeClaim ¶
func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
type SenderOptionFunc ¶
type SenderOptionFunc func(sender *Sender)
func WithTranscoder ¶
func WithTranscoder(factory binding.TransformerFactory) SenderOptionFunc
Click to show internal directories.
Click to hide internal directories.