kafka_sarama

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2020 License: Apache-2.0 Imports: 10 Imported by: 1

Documentation

Overview

Package kafka implements the Kafka CloudEvents binding.

Index

Constants

View Source
const ContentType = "content-type"
View Source
const (
	SKIP_KEY_EXTENSION = "SKIP_KEY_EXTENSION"
)

Variables

This section is empty.

Functions

func EncodeKafkaProducerMessage

func EncodeKafkaProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformerFactories binding.TransformerFactories) error

Fill the provided producerMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Translate documentation).

func NewMessage

func NewMessage(cm *sarama.ConsumerMessage) (binding.Message, error)

NewMessage returns a Message with data from body. Reads and closes body.

func NewMessageFromRaw

func NewMessageFromRaw(key []byte, value []byte, contentType string, headers map[string][]byte) (binding.Message, error)

func WithSkipKeyExtension

func WithSkipKeyExtension(ctx context.Context) context.Context

Types

type Message

type Message struct {
	Key, Value  []byte
	Headers     map[string][]byte
	ContentType string
	// contains filtered or unexported fields
}

Message holds a sarama ConsumerMessage.

func (*Message) Binary

func (m *Message) Binary(ctx context.Context, encoder binding.BinaryEncoder) error

func (*Message) Encoding

func (m *Message) Encoding() binding.Encoding

func (*Message) Finish

func (m *Message) Finish(error) error

func (*Message) Structured

func (m *Message) Structured(ctx context.Context, encoder binding.StructuredEncoder) error

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func NewReceiver

func NewReceiver(client sarama.Client, groupId string, topic string) *Receiver

func (*Receiver) Cleanup

func (*Receiver) Close

func (r *Receiver) Close(ctx context.Context) error

func (*Receiver) ConsumeClaim

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

func (*Receiver) Setup

func (r *Receiver) Setup(sess sarama.ConsumerGroupSession) error

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

func NewSender

func NewSender(client sarama.Client, topic string, options ...SenderOptionFunc) (*Sender, error)

func (*Sender) Close

func (s *Sender) Close(ctx context.Context) error

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, m binding.Message) error

type SenderOptionFunc

type SenderOptionFunc func(sender *Sender)

func WithTranscoder

func WithTranscoder(factory binding.TransformerFactory) SenderOptionFunc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL