Documentation ¶
Index ¶
- func FetchMessage(broker *sarama.Broker, topic string, partition int32, offset int64, ...) (*sarama.ConsumerMessage, error)
- func FetchMessages(broker *sarama.Broker, topic string, partition int32, offset int64, ...) ([]*sarama.ConsumerMessage, error)
- func JoinConsumerGroup(ctx context.Context, client sarama.Client, topic, groupID string, ...) (<-chan *sarama.ConsumerMessage, error)
- func SaveConfiguration(w io.Writer, conf *Configuration) error
- type AuthConfiguration
- type AvroDecoder
- type Configuration
- func (conf *Configuration) AddContext(contextConf ContextConfiguration) error
- func (conf *Configuration) Brokers(context string) []string
- func (conf *Configuration) Context(name string) (ContextConfiguration, error)
- func (conf *Configuration) DeleteContext(name string) error
- func (conf *Configuration) GetCurrentContext() (ContextConfiguration, error)
- func (conf *Configuration) RenameContext(oldName, newName string) error
- func (conf *Configuration) SaramaConfig() (*sarama.Config, error)
- func (conf *Configuration) SetContext(name string) error
- func (conf *Configuration) TopicConfig(topic string) (*TopicConfig, error)
- type ConfluentSchemaRegistry
- type Consumer
- func (c *Consumer) ConsumeAllPartitions(ctx context.Context, topic string, startOffset int64) (<-chan *sarama.ConsumerMessage, error)
- func (c *Consumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64) (<-chan *sarama.ConsumerMessage, error)
- func (c *Consumer) ConsumePartitions(ctx context.Context, topic string, partitions []PartitionOffset) (<-chan *sarama.ConsumerMessage, error)
- func (c *Consumer) ProcessPartitions(ctx context.Context, topic string, partitions []PartitionOffset, ...) error
- type ContextConfiguration
- type Decoder
- type Encoder
- type GlobalProtoDecoderConfig
- type Message
- type OffsetConsumer
- type PartitionOffset
- type PartitionOffsetRange
- type PartitionOffsets
- type ProtoConfig
- type ProtoDecoder
- type ProtoEncoder
- type RangeConsumer
- type SchemaRegistry
- type SchemaRegistryConfiguration
- type StringDecoder
- type StringEncoder
- type TopicConfig
- type TopicProtoConfig
- type TopicSchemaConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FetchMessage ¶
func FetchMessage(broker *sarama.Broker, topic string, partition int32, offset int64, debugLogger *log.Logger) (*sarama.ConsumerMessage, error)
FetchMessage returns a single message from Kafka. Note that if you want to retrieve more than a single message it might be more performant to use FetchMessages(…) instead.
func FetchMessages ¶
func FetchMessages(broker *sarama.Broker, topic string, partition int32, offset int64, fetchSizeBytes int32, debugLogger *log.Logger) ([]*sarama.ConsumerMessage, error)
FetchMessages retrieves a bunch of messages starting at a given offset. This function is intended to be used if you are interested into a specific offset but for efficiency reasons, Kafka will actually return not only the requested message but also subsequent messages (i.e. ordered by offset) to fill up the response until the fetch size is reached. You can control how many bytes we are requesting using the fetchSizeBytes parameter.
func JoinConsumerGroup ¶
func SaveConfiguration ¶
func SaveConfiguration(w io.Writer, conf *Configuration) error
Types ¶
type AuthConfiguration ¶ added in v1.2.0
type AuthConfiguration struct { Method string `yaml:"method,omitempty"` // either tls or sasl RootCAFile string `yaml:"root_ca_file,omitempty"` Username string `yaml:"username,omitempty"` // only used when method=sasl Password string `yaml:"password,omitempty"` // only used when method=sasl KeyFile string `yaml:"key_file,omitempty"` // only used when method=tls CertificateFile string `yaml:"certificate_file,omitempty"` // only used when method=tls }
type AvroDecoder ¶
type AvroDecoder struct {
// contains filtered or unexported fields
}
AvroDecoder is a Decoder implementation that supports the Apache Avro format.
func NewAvroDecoder ¶
func NewAvroDecoder(r SchemaRegistry) *AvroDecoder
NewAvroDecoder creates a new AvroDecoder instance.
func (*AvroDecoder) Decode ¶
func (d *AvroDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error)
Decode a message from Kafka into our own Message type.
type Configuration ¶
type Configuration struct { APIVersion string `yaml:"api_version"` CurrentContext string `yaml:"current_context"` PreviousContext string `yaml:"previous_context,omitempty"` Contexts []ContextConfiguration `yaml:"contexts"` Topics []*TopicConfig `yaml:"topics,omitempty"` Proto GlobalProtoDecoderConfig `yaml:"proto,omitempty"` }
func LoadConfiguration ¶
func LoadConfiguration(r io.Reader) (*Configuration, error)
func NewConfiguration ¶
func NewConfiguration() *Configuration
func (*Configuration) AddContext ¶
func (conf *Configuration) AddContext(contextConf ContextConfiguration) error
func (*Configuration) Brokers ¶
func (conf *Configuration) Brokers(context string) []string
func (*Configuration) Context ¶
func (conf *Configuration) Context(name string) (ContextConfiguration, error)
func (*Configuration) DeleteContext ¶
func (conf *Configuration) DeleteContext(name string) error
func (*Configuration) GetCurrentContext ¶ added in v1.2.0
func (conf *Configuration) GetCurrentContext() (ContextConfiguration, error)
func (*Configuration) RenameContext ¶
func (conf *Configuration) RenameContext(oldName, newName string) error
func (*Configuration) SaramaConfig ¶ added in v1.2.0
func (conf *Configuration) SaramaConfig() (*sarama.Config, error)
func (*Configuration) SetContext ¶
func (conf *Configuration) SetContext(name string) error
func (*Configuration) TopicConfig ¶
func (conf *Configuration) TopicConfig(topic string) (*TopicConfig, error)
type ConfluentSchemaRegistry ¶
type ConfluentSchemaRegistry struct {
// contains filtered or unexported fields
}
func NewConfluentSchemaRegistry ¶
func NewConfluentSchemaRegistry(config SchemaRegistryConfiguration) (*ConfluentSchemaRegistry, error)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func (*Consumer) ConsumeAllPartitions ¶
func (*Consumer) ConsumePartition ¶
func (*Consumer) ConsumePartitions ¶
func (c *Consumer) ConsumePartitions(ctx context.Context, topic string, partitions []PartitionOffset) (<-chan *sarama.ConsumerMessage, error)
func (*Consumer) ProcessPartitions ¶
func (c *Consumer) ProcessPartitions(ctx context.Context, topic string, partitions []PartitionOffset, process func(sarama.PartitionConsumer)) error
type ContextConfiguration ¶
type ContextConfiguration struct { Name string `yaml:"name"` Brokers []string `yaml:"brokers"` Auth AuthConfiguration `yaml:"auth,omitempty"` SchemaRegistry SchemaRegistryConfiguration `yaml:"schema_registry,omitempty"` }
type Decoder ¶
type Decoder interface {
Decode(*sarama.ConsumerMessage) (*Message, error)
}
func NewTopicDecoder ¶
func NewTopicDecoder(topic string, conf Configuration) (Decoder, error)
type Encoder ¶
func NewTopicEncoder ¶
func NewTopicEncoder(topic string, conf Configuration) (Encoder, error)
type GlobalProtoDecoderConfig ¶
type GlobalProtoDecoderConfig struct {
Includes []string `yaml:"includes,omitempty"`
}
type Message ¶
type Message struct { Topic string Partition int32 Offset int64 Headers map[string][]string Timestamp time.Time Key any Value any }
func NewMessage ¶
func NewMessage(m *sarama.ConsumerMessage) *Message
NewMessage creates a new Message from a given Kafka message. The Key and Value are copied into the Message as is (i.e. without decoding).
type OffsetConsumer ¶
type OffsetConsumer struct {
// contains filtered or unexported fields
}
func NewOffsetConsumer ¶
func (*OffsetConsumer) Consume ¶
func (c *OffsetConsumer) Consume(ctx context.Context, topic string, partitions []PartitionOffsets) (<-chan *sarama.ConsumerMessage, error)
type PartitionOffset ¶
type PartitionOffset struct { Partition int32 Offset int64 // can also be sarama.Newest or sarama.Oldest }
PartitionOffset marks a specific offset in a Kafka topic partition.
type PartitionOffsetRange ¶
type PartitionOffsets ¶
type ProtoConfig ¶
type ProtoDecoder ¶
type ProtoDecoder struct {
// contains filtered or unexported fields
}
func NewProtoDecoder ¶
func NewProtoDecoder(conf ProtoConfig) (*ProtoDecoder, error)
func (*ProtoDecoder) Decode ¶
func (d *ProtoDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error)
type ProtoEncoder ¶
type ProtoEncoder struct {
// contains filtered or unexported fields
}
func NewProtoEncoder ¶
func NewProtoEncoder(conf ProtoConfig) (*ProtoEncoder, error)
type RangeConsumer ¶
type RangeConsumer struct {
// contains filtered or unexported fields
}
func NewRangeConsumer ¶
func NewRangeConsumer(client sarama.Client, logger *log.Logger) *RangeConsumer
func (*RangeConsumer) Consume ¶
func (c *RangeConsumer) Consume(ctx context.Context, topic string, partitions []PartitionOffsetRange) (<-chan *sarama.ConsumerMessage, error)
type SchemaRegistry ¶
func NewSchemaRegistry ¶
func NewSchemaRegistry(conf Configuration) (SchemaRegistry, error)
type StringDecoder ¶
type StringDecoder struct{}
The StringDecoder assumes that the values of all consumed messages are unicode strings.
func (*StringDecoder) Decode ¶
func (d *StringDecoder) Decode(kafkaMsg *sarama.ConsumerMessage) (*Message, error)
type StringEncoder ¶
type StringEncoder struct{}
type TopicConfig ¶
type TopicConfig struct { Name string `yaml:"name"` Schema TopicSchemaConfig `yaml:"schema,omitempty"` }
type TopicProtoConfig ¶
type TopicSchemaConfig ¶
type TopicSchemaConfig struct { Type string `yaml:"type"` // "avro" or "proto" Proto TopicProtoConfig `yaml:"proto,omitempty"` }