Documentation ¶
Index ¶
- func WriteProducerMessage(ctx context.Context, m binding.Message, ...) error
- type Consumer
- type Message
- type Protocol
- type ProtocolOptionFunc
- type Receiver
- func (r *Receiver) Cleanup(sarama.ConsumerGroupSession) error
- func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)
- func (r *Receiver) Setup(sess sarama.ConsumerGroupSession) error
- type Sender
- type SenderOptionFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WriteProducerMessage ¶
func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformerFactories ...binding.TransformerFactory) error
Fill the provided producerMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation).
Types ¶
type Consumer ¶
type Consumer struct { Receiver // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumerFromClient ¶
type Message ¶
type Message struct {
Key, Value []byte
Headers map[string][]byte
ContentType string
// contains filtered or unexported fields
}
Message holds a Kafka Message. This message *can* be read several times safely
func NewMessage ¶
Returns a binding.Message that holds the provided kafka message components. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func NewMessageFromConsumerMessage ¶
func NewMessageFromConsumerMessage(cm *sarama.ConsumerMessage) *Message
Returns a binding.Message that holds the provided ConsumerMessage. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func (*Message) ReadBinary ¶
func (*Message) ReadEncoding ¶
func (*Message) ReadStructured ¶
type Protocol ¶
type Protocol struct { // Kafka Client sarama.Client // Sender Sender *Sender // Sender options SenderContextDecorators []func(context.Context) context.Context // Consumer Consumer *Consumer // contains filtered or unexported fields }
func NewProtocol ¶
func NewProtocol(brokers []string, saramaConfig *sarama.Config, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)
NewProtocol creates a new kafka transport.
func NewProtocolFromClient ¶
func NewProtocolFromClient(client sarama.Client, sendToTopic string, receiveFromTopic string, opts ...ProtocolOptionFunc) (*Protocol, error)
NewProtocolFromClient creates a new kafka transport starting from a sarama.Client
func (*Protocol) HasTracePropagation ¶
HasTracePropagation implements Protocol.HasTracePropagation
func (*Protocol) OpenInbound ¶
StartReceiver implements Protocol.StartReceiver NOTE: This is a blocking call.
type ProtocolOptionFunc ¶
type ProtocolOptionFunc func(protocol *Protocol)
kafka_sarama.Protocol options
func WithReceiverGroupId ¶
func WithReceiverGroupId(groupId string) ProtocolOptionFunc
func WithSenderContextDecorators ¶
func WithSenderContextDecorators(decorator func(context.Context) context.Context) ProtocolOptionFunc
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver which implements sarama.ConsumerGroupHandler After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.
func NewReceiver ¶
func NewReceiver() *Receiver
NewReceiver creates a Receiver which implements sarama.ConsumerGroupHandler The sarama.ConsumerGroup must be started invoking. If you need a Receiver which also manage the ConsumerGroup, use NewConsumer After the first invocation of Receiver.Receive(), the sarama.ConsumerGroup is created and started.
func (*Receiver) ConsumeClaim ¶
func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender implements binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer
func NewSender ¶
func NewSender(brokers []string, saramaConfig *sarama.Config, topic string, options ...SenderOptionFunc) (*Sender, error)
Returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer
func NewSenderFromClient ¶
func NewSenderFromClient(client sarama.Client, topic string, options ...SenderOptionFunc) (*Sender, error)
Returns a binding.Sender that sends messages to a specific receiverTopic using sarama.SyncProducer
type SenderOptionFunc ¶
type SenderOptionFunc func(sender *Sender)
kafka_sarama.Sender options
func WithTransformer ¶
func WithTransformer(transformer binding.TransformerFactory) SenderOptionFunc
Add a transformer, which Sender uses while encoding a binding.Message to a sarama.ProducerMessage