kafka

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const PartitionAny = -1

Variables

This section is empty.

Functions

This section is empty.

Types

type Admin

type Admin interface {
	FetchInfo(topics []string) (map[string]*Topic, error)
	CreateTopics(topics []*Topic) error
	ListTopics() ([]string, error)
	ApplyConfigs() error
	StoreConfigs(topics []*Topic) error
	DeleteTopics(topics []string) error
	Close()
}

type Assignment

type Assignment interface {
	TPs() TopicPartitions
	ResetOffset(tp TopicPartition, offset Offset)
}

type ConsumerBuilder

type ConsumerBuilder func(func(config *ConsumerConfig)) (PartitionConsumer, error)

type ConsumerConfig

type ConsumerConfig struct {
	Id                      string
	BootstrapServers        []string
	IsolationLevel          IsolationLevel
	TopicMetaFetchTimeout   time.Duration
	EOSEnabled              bool
	MaxPollInterval         time.Duration
	ConsumerMessageChanSize int
	SecurityProtocol        string
	SSL                     SSLCfg
	SASL                    SaslCfg
	Logger                  log.Logger
	MetricsReporter         metrics.Reporter
	TracerProvider          sdktrace.TracerProvider
	TraceContext            propagation.TraceContext
	ContextExtractor        RecordContextBinderFunc
}

func NewPartitionConsumerConfig

func NewPartitionConsumerConfig() *ConsumerConfig

func (*ConsumerConfig) Copy

func (conf *ConsumerConfig) Copy() *ConsumerConfig

type ConsumerOffset

type ConsumerOffset struct {
	Topic     string
	Partition int32
	Offset    int64
	Meta      string
}

func (*ConsumerOffset) String

func (off *ConsumerOffset) String() string

type ConsumerProvider

type ConsumerProvider interface {
	NewBuilder(config *ConsumerConfig) ConsumerBuilder
}

type DeliveryReport

type DeliveryReport interface {
	Topic() string
	Partition() int32
	Offset() int64
	Error() error
}

type Error

type Error struct {
	Err error
}

func (*Error) Error

func (p *Error) Error() string

func (*Error) String

func (p *Error) String() string

type Event

type Event interface {
	String() string
}

type GroupConsumer

type GroupConsumer interface {
	// Subscribe subscribes to a list of topic with a user provided RebalanceHandler
	Subscribe(tps []string, handler RebalanceHandler) error
	// Unsubscribe signals the consumer to unsubscribe from group
	Unsubscribe() error
	Errors() <-chan error
}

GroupConsumer is a wrapper for a kafka group consumer adaptor.

type GroupConsumerBuilder

type GroupConsumerBuilder func(func(config *GroupConsumerConfig)) (GroupConsumer, error)

type GroupConsumerConfig

type GroupConsumerConfig struct {
	*ConsumerConfig
	GroupId string
	Offsets struct {
		Initial Offset
		Commit  struct {
			Auto     bool
			Interval time.Duration
		}
	}
}

func NewConfig

func NewConfig() *GroupConsumerConfig

func (*GroupConsumerConfig) Copy

type GroupConsumerProvider

type GroupConsumerProvider interface {
	NewBuilder(config *GroupConsumerConfig) GroupConsumerBuilder
}

type GroupConsumerStatus

type GroupConsumerStatus string
const (
	ConsumerPending     GroupConsumerStatus = `Pending`
	ConsumerRebalancing GroupConsumerStatus = `Rebalancing`
	ConsumerReady       GroupConsumerStatus = `Ready`
)

type GroupMeta

type GroupMeta struct {
	Meta interface{}
}

GroupMeta wraps consumer group metadata used in transactional producer commits.

type GroupSession

type GroupSession interface {
	Assignment() Assignment
	GroupMeta() (*GroupMeta, error)
	TopicMeta() (TopicMeta, error)
	MarkOffset(ctx context.Context, record Record, meta string) error
	CommitOffset(ctx context.Context, record Record, meta string) error
}

type IsolationLevel

type IsolationLevel int8
const (
	ReadUncommitted IsolationLevel = iota
	ReadCommitted
)

type Offset

type Offset int64
const (
	OffsetEarliest Offset = -2
	OffsetLatest   Offset = -1
	OffsetStored   Offset = -3
	OffsetUnknown  Offset = -4
)

func (Offset) String

func (o Offset) String() string

type OffsetManager

type OffsetManager interface {
	OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error)
	GetOffsetLatest(topic string, partition int32) (offset int64, err error)
	GetOffsetOldest(topic string, partition int32) (offset int64, err error)
	Close() error
}

type OffsetManagerBuilder

type OffsetManagerBuilder func(func(config *OffsetManagerConfig)) (OffsetManager, error)

type OffsetManagerConfig

type OffsetManagerConfig struct {
	Id               string
	BootstrapServers []string

	Logger          log.Logger
	MetricsReporter metrics.Reporter
}

func NewOffsetManagerConfig

func NewOffsetManagerConfig() *OffsetManagerConfig

type Partition

type Partition interface {
	Events() <-chan Event
	BeginOffset() Offset
	EndOffset() Offset
	Close() error
}

type PartitionClaim

type PartitionClaim interface {
	TopicPartition() TopicPartition
	Records() <-chan Record
}

type PartitionConf

type PartitionConf struct {
	Id    int32
	Error error
}

type PartitionConsumer

type PartitionConsumer interface {
	ConsumeTopic(ctx context.Context, topic string, offset Offset) (map[int32]Partition, error)
	Partitions(ctx context.Context, topic string) ([]int32, error)
	ConsumePartition(ctx context.Context, topic string, partition int32, offset Offset) (Partition, error)
	OffsetValid(topic string, partition int32, offset int64) (isValid bool, err error)
	GetOffsetLatest(topic string, partition int32) (offset int64, err error)
	GetOffsetOldest(topic string, partition int32) (offset int64, err error)
	Close() error
}

type PartitionEnd

type PartitionEnd struct {
	Tps []TopicPartition
}

func (*PartitionEnd) String

func (p *PartitionEnd) String() string

func (*PartitionEnd) TopicPartitions

func (p *PartitionEnd) TopicPartitions() []TopicPartition

type PartitionerFunc

type PartitionerFunc func(record Record, numPartitions int32) (int32, error)

type PartitionerType

type PartitionerType string

type Producer

type Producer interface {
	NewRecord(
		ctx context.Context,
		key, value []byte,
		topic string,
		partition int32,
		timestamp time.Time,
		headers RecordHeaders,
		meta string) Record
	ProduceSync(ctx context.Context, record Record) (partition int32, offset int64, err error)
	ProduceDlt(ctx context.Context, record Record) (partition int32, offset int64, err error)
	Restart() error
	Close() error
	HasDltTopic() bool
	DltTopic() string
}

type ProducerBuilder

type ProducerBuilder func(conf func(*ProducerConfig)) (Producer, error)

type ProducerConfig

type ProducerConfig struct {
	Id               string
	BootstrapServers []string
	SecurityProtocol string
	SSL              SSLCfg
	SASL             SaslCfg
	PartitionerFunc  PartitionerFunc
	Acks             RequiredAcks
	Transactional    struct {
		Enabled bool
		Id      string
	}
	Idempotent      bool
	Logger          log.Logger
	MetricsReporter metrics.Reporter
	TracerProvider  trace.TracerProvider
	DltTopic        string
}

func NewProducerConfig

func NewProducerConfig() *ProducerConfig

func (*ProducerConfig) Copy

func (conf *ProducerConfig) Copy() *ProducerConfig

type ProducerErr

type ProducerErr interface {
	error
	RequiresRestart() bool
	TxnRequiresAbort() bool
}

type ProducerFactory

type ProducerFactory interface {
	NewBuilder(config *GroupConsumerConfig) GroupConsumerBuilder
}

type ProducerProvider

type ProducerProvider interface {
	NewBuilder(config *ProducerConfig) ProducerBuilder
}

type RebalanceHandler

type RebalanceHandler interface {
	OnPartitionRevoked(ctx context.Context, session GroupSession) error
	OnPartitionAssigned(ctx context.Context, session GroupSession) error
	OnLost() error
	Consume(ctx context.Context, session GroupSession, partition PartitionClaim) error
}

type Record

type Record interface {
	Ctx() context.Context
	Key() []byte
	Value() []byte
	Topic() string
	Partition() int32
	Offset() int64
	Timestamp() time.Time
	Headers() RecordHeaders
	String() string
	SetHeaders(rec []RecordHeader) error
}

type RecordContextBinderFunc

type RecordContextBinderFunc func(record Record) context.Context

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

RecordHeader stores key and value for a record header.

type RecordHeaders

type RecordHeaders []RecordHeader

RecordHeaders are list of key:value pairs.

func (RecordHeaders) Read

func (h RecordHeaders) Read(key []byte) []byte

Read returns a RecordHeader by its name or nil if not exist

type RecordMeta

type RecordMeta struct {
	Topic     string
	Partition int32
	Offset    int64
	Timestamp time.Time
	Headers   RecordHeaders
}

type RequiredAcks

type RequiredAcks int
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0

	// WaitForLeader waits for only the local commit to succeed before responding.
	WaitForLeader RequiredAcks = 1

	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

func (RequiredAcks) String

func (ack RequiredAcks) String() string

type SSLCfg

type SSLCfg struct {
	CaLocation string `mapstructure:"ca-location" json:"ca-location" yaml:"ca-location"`
	SkipVerify bool   `json:"skv,omitempty" yaml:"skv,omitempty" mapstructure:"skv,omitempty"`
}

type SaslCfg

type SaslCfg struct {
	Mechanisms string `mapstructure:"mechanisms" json:"mechanisms" yaml:"mechanisms"`
	Username   string `mapstructure:"username" json:"username" yaml:"username"`
	Password   string `mapstructure:"password" json:"password" yaml:"password"`
	CaLocation string `json:"ca-location" mapstructure:"ca-location" yaml:"ca-location"`
	SkipVerify bool   `json:"skv,omitempty" mapstructure:"skv,omitempty" yaml:"skv,omitempty"`
}

type Topic

type Topic struct {
	Name              string
	Partitions        []PartitionConf
	Error             error
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	ConfigEntries     map[string]string
}

type TopicConfig

type TopicConfig struct {
	NumPartitions     int32
	ReplicationFactor int16
	ConfigEntries     map[string]string
	Internal          bool
	AutoCreate        bool
}

type TopicMeta

type TopicMeta []TopicPartition

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int32
}

TopicPartition represents a kafka topic partition.

func (TopicPartition) String

func (tp TopicPartition) String() string

type TopicPartitions

type TopicPartitions []TopicPartition

func (TopicPartitions) Len

func (list TopicPartitions) Len() int

Len is part of sort.Interface.

func (TopicPartitions) Less

func (list TopicPartitions) Less(i, j int) bool

Less is part of sort.Interface.

func (TopicPartitions) Swap

func (list TopicPartitions) Swap(i, j int)

Swap is part of sort.Interface.

type TransactionalProducer

type TransactionalProducer interface {
	Producer
	ProduceAsync(ctx context.Context, record Record) (err error)
	InitTransactions(ctx context.Context) error
	BeginTransaction() error
	SendOffsetsToTransaction(ctx context.Context, offsets []ConsumerOffset, meta *GroupMeta) error
	CommitTransaction(ctx context.Context) error
	AbortTransaction(ctx context.Context) error
}

Directories

Path Synopsis
adaptors

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL