internal

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2023 License: BSD-3-Clause Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FetchMessage

func FetchMessage(broker *sarama.Broker, topic string, partition int32, offset int64, debugLogger *log.Logger) (*sarama.ConsumerMessage, error)

FetchMessage returns a single message from Kafka. Note that if you want to retrieve more than a single message it might be more performant to use FetchMessages(…) instead.

func FetchMessages

func FetchMessages(broker *sarama.Broker, topic string, partition int32, offset int64, fetchSizeBytes int32, debugLogger *log.Logger) ([]*sarama.ConsumerMessage, error)

FetchMessages retrieves a bunch of messages starting at a given offset. This function is intended to be used if you are interested into a specific offset but for efficiency reasons, Kafka will actually return not only the requested message but also subsequent messages (i.e. ordered by offset) to fill up the response until the fetch size is reached. You can control how many bytes we are requesting using the fetchSizeBytes parameter.

func JoinConsumerGroup

func JoinConsumerGroup(ctx context.Context, client sarama.Client, topic, groupID string, logger *log.Logger) (<-chan *sarama.ConsumerMessage, error)

func SaveConfiguration

func SaveConfiguration(w io.Writer, conf *Configuration) error

Types

type AuthConfiguration added in v1.2.0

type AuthConfiguration struct {
	Method     string `yaml:"method,omitempty"` // either tls or sasl
	RootCAFile string `yaml:"root_ca_file,omitempty"`

	Username string `yaml:"username,omitempty"` // only used when method=sasl
	Password string `yaml:"password,omitempty"` // only used when method=sasl

	KeyFile         string `yaml:"key_file,omitempty"`         // only used when method=tls
	CertificateFile string `yaml:"certificate_file,omitempty"` // only used when method=tls
}

type AvroDecoder

type AvroDecoder struct {
	// contains filtered or unexported fields
}

AvroDecoder is a Decoder implementation that supports the Apache Avro format.

func NewAvroDecoder

func NewAvroDecoder(r SchemaRegistry) *AvroDecoder

NewAvroDecoder creates a new AvroDecoder instance.

func (*AvroDecoder) Decode

func (d *AvroDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error)

Decode a message from Kafka into our own Message type.

type Configuration

type Configuration struct {
	APIVersion      string `yaml:"api_version"`
	CurrentContext  string `yaml:"current_context"`
	PreviousContext string `yaml:"previous_context,omitempty"`

	Contexts []ContextConfiguration   `yaml:"contexts"`
	Topics   []*TopicConfig           `yaml:"topics,omitempty"`
	Proto    GlobalProtoDecoderConfig `yaml:"proto,omitempty"`
}

func LoadConfiguration

func LoadConfiguration(r io.Reader) (*Configuration, error)

func NewConfiguration

func NewConfiguration() *Configuration

func (*Configuration) AddContext

func (conf *Configuration) AddContext(contextConf ContextConfiguration) error

func (*Configuration) Brokers

func (conf *Configuration) Brokers(context string) []string

func (*Configuration) Context

func (conf *Configuration) Context(name string) (ContextConfiguration, error)

func (*Configuration) DeleteContext

func (conf *Configuration) DeleteContext(name string) error

func (*Configuration) GetCurrentContext added in v1.2.0

func (conf *Configuration) GetCurrentContext() (ContextConfiguration, error)

func (*Configuration) RenameContext

func (conf *Configuration) RenameContext(oldName, newName string) error

func (*Configuration) SaramaConfig added in v1.2.0

func (conf *Configuration) SaramaConfig() (*sarama.Config, error)

func (*Configuration) SetContext

func (conf *Configuration) SetContext(name string) error

func (*Configuration) TopicConfig

func (conf *Configuration) TopicConfig(topic string) (*TopicConfig, error)

type ConfluentSchemaRegistry

type ConfluentSchemaRegistry struct {
	// contains filtered or unexported fields
}

func (*ConfluentSchemaRegistry) Schema

func (r *ConfluentSchemaRegistry) Schema(id int) (string, error)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(con sarama.Consumer) *Consumer

func (*Consumer) ConsumeAllPartitions

func (c *Consumer) ConsumeAllPartitions(ctx context.Context, topic string, startOffset int64) (<-chan *sarama.ConsumerMessage, error)

func (*Consumer) ConsumePartition

func (c *Consumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64) (<-chan *sarama.ConsumerMessage, error)

func (*Consumer) ConsumePartitions

func (c *Consumer) ConsumePartitions(ctx context.Context, topic string, partitions []PartitionOffset) (<-chan *sarama.ConsumerMessage, error)

func (*Consumer) ProcessPartitions

func (c *Consumer) ProcessPartitions(ctx context.Context, topic string, partitions []PartitionOffset, process func(sarama.PartitionConsumer)) error

type ContextConfiguration

type ContextConfiguration struct {
	Name    string            `yaml:"name"`
	Brokers []string          `yaml:"brokers"`
	Auth    AuthConfiguration `yaml:"auth,omitempty"`

	SchemaRegistry SchemaRegistryConfiguration `yaml:"schema_registry,omitempty"`
}

type Decoder

type Decoder interface {
	Decode(*sarama.ConsumerMessage) (*Message, error)
}

func NewTopicDecoder

func NewTopicDecoder(topic string, conf Configuration) (Decoder, error)

type Encoder

type Encoder interface {
	Encode(msg string) (sarama.Encoder, error)
}

func NewTopicEncoder

func NewTopicEncoder(topic string, conf Configuration) (Encoder, error)

type GlobalProtoDecoderConfig

type GlobalProtoDecoderConfig struct {
	Includes []string `yaml:"includes,omitempty"`
}

type Message

type Message struct {
	Topic     string
	Partition int32
	Offset    int64
	Headers   map[string][]string
	Timestamp time.Time
	Key       any
	Value     any
}

func NewMessage

func NewMessage(m *sarama.ConsumerMessage) *Message

NewMessage creates a new Message from a given Kafka message. The Key and Value are copied into the Message as is (i.e. without decoding).

type OffsetConsumer

type OffsetConsumer struct {
	// contains filtered or unexported fields
}

func NewOffsetConsumer

func NewOffsetConsumer(client sarama.Client, fetchSize int32, logger, debug *log.Logger) *OffsetConsumer

func (*OffsetConsumer) Consume

func (c *OffsetConsumer) Consume(ctx context.Context, topic string, partitions []PartitionOffsets) (<-chan *sarama.ConsumerMessage, error)

type PartitionOffset

type PartitionOffset struct {
	Partition int32
	Offset    int64 // can also be sarama.Newest or sarama.Oldest
}

PartitionOffset marks a specific offset in a Kafka topic partition.

type PartitionOffsetRange

type PartitionOffsetRange struct {
	Partition int32
	From      int64
	Until     *int64 // optional
}

type PartitionOffsets

type PartitionOffsets struct {
	Partition int32
	Offsets   []int64
}

type ProtoConfig

type ProtoConfig struct {
	Includes []string
	File     string
	Type     string
}

type ProtoDecoder

type ProtoDecoder struct {
	// contains filtered or unexported fields
}

func NewProtoDecoder

func NewProtoDecoder(conf ProtoConfig) (*ProtoDecoder, error)

func (*ProtoDecoder) Decode

func (d *ProtoDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error)

type ProtoEncoder

type ProtoEncoder struct {
	// contains filtered or unexported fields
}

func NewProtoEncoder

func NewProtoEncoder(conf ProtoConfig) (*ProtoEncoder, error)

func (*ProtoEncoder) Encode

func (d *ProtoEncoder) Encode(msg string) (sarama.Encoder, error)

type RangeConsumer

type RangeConsumer struct {
	// contains filtered or unexported fields
}

func NewRangeConsumer

func NewRangeConsumer(client sarama.Client, logger *log.Logger) *RangeConsumer

func (*RangeConsumer) Consume

func (c *RangeConsumer) Consume(ctx context.Context, topic string, partitions []PartitionOffsetRange) (<-chan *sarama.ConsumerMessage, error)

type SchemaRegistry

type SchemaRegistry interface {
	Schema(id int) (string, error)
}

func NewSchemaRegistry

func NewSchemaRegistry(conf Configuration) (SchemaRegistry, error)

type SchemaRegistryConfiguration

type SchemaRegistryConfiguration struct {
	URL      string `yaml:"url"`
	Username string `yaml:"username,omitempty"`
	Password string `yaml:"password,omitempty"`
}

type StringDecoder

type StringDecoder struct{}

The StringDecoder assumes that the values of all consumed messages are unicode strings.

func (*StringDecoder) Decode

func (d *StringDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error)

type StringEncoder

type StringEncoder struct{}

func (*StringEncoder) Encode

func (e *StringEncoder) Encode(msg string) (sarama.Encoder, error)

type TopicConfig

type TopicConfig struct {
	Name   string            `yaml:"name"`
	Schema TopicSchemaConfig `yaml:"schema,omitempty"`
}

type TopicProtoConfig

type TopicProtoConfig struct {
	Type string `yaml:"type,omitempty"`
	File string `yaml:"file,omitempty"`
}

type TopicSchemaConfig

type TopicSchemaConfig struct {
	Type  string           `yaml:"type"` // "avro" or "proto"
	Proto TopicProtoConfig `yaml:"proto,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL