kafka_franz

package
v1.0.8-benthos-in-go-r... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CloudEventToKafkaMessage

func CloudEventToKafkaMessage(ctx context.Context, ce *proto_cloudevents.CloudEvent) (*kgo.Record, error)

CloudEventToKafkaMessage converts a grpc CloudEvent to a kafka.Message

func MakeKafkaMessage

func MakeKafkaMessage(ctx context.Context, event cloudevents.Event) (*kgo.Record, error)

MakeKafkaMessage from cloudEvent sdk-go Event

func WithMessageKey

func WithMessageKey(ctx context.Context, key []byte) context.Context

WithMessageKey allows to set the key used when sending the producer message

func WithSkipKeyMapping

func WithSkipKeyMapping(ctx context.Context) context.Context

func WriteProducerMessage

func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *kgo.Record, transformers ...binding.Transformer) error

WriteProducerMessage fills the provided producerMessage with the message m. Using context you can tweak the encoding processing (more details on binding.Write documentation). By default, this function implements the key mapping, trying to set the key of the message based on partitionKey extension. If you want to disable the Key Mapping, decorate the context with `WithSkipKeyMapping`

Types

type Message

type Message struct {
	Value       []byte
	Headers     map[string][]byte
	ContentType string
	// contains filtered or unexported fields
}

Message holds a Kafka Message. This message *can* be read several times safely

func NewMessage

func NewMessage(value []byte, contentType string, headers map[string][]byte) *Message

NewMessage returns a binding.Message that holds the provided kafka message components. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_segmentio.Message instance

func NewMessageFromConsumerMessage

func NewMessageFromConsumerMessage(cm *kgo.Record) *Message

NewMessageFromConsumerMessage returns a binding.Message that holds the provided ConsumerMessage. The returned binding.Message *can* be read several times safely This function *doesn't* guarantee that the returned binding.Message is always a kafka_segmentio.Message instance

func (*Message) Finish

func (m *Message) Finish(error) error

func (*Message) GetAttribute

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)

func (*Message) ReadEncoding

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL