kafka

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: Apache-2.0 Imports: 14 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Factory = FactoryType{}
View Source
var Fatal = false
View Source
var SlowProducerTimeout time.Duration = 2 * time.Second

Functions

func EnsureTopic

func EnsureTopic(topic string, kafkaUrl string, knownTopics *map[string]bool, configMap map[string][]kafka.ConfigEntry, partitions int, replicationFactor int) (err error)

func GetBroker

func GetBroker(bootstrapUrl string) (brokers []string, err error)

func GetTopicConfig

func GetTopicConfig(configMap map[string][]kafka.ConfigEntry, topic string) []kafka.ConfigEntry

func InitTopic

func InitTopic(kafkaUrl string, configMap map[string][]kafka.ConfigEntry, topics ...string) (err error)

func InitTopicWithConfig

func InitTopicWithConfig(bootstrapUrl string, configMap map[string][]kafka.ConfigEntry, numPartitions int, replicationFactor int, topics ...string) (err error)

func NewConsumer

func NewConsumer(ctx context.Context, config ConsumerConfig, listener func(topic string, msg []byte, time time.Time) error, errorhandler func(err error)) (err error)

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

func (*AsyncProducer) Log

func (this *AsyncProducer) Log(logger *log.Logger)

func (*AsyncProducer) Produce

func (this *AsyncProducer) Produce(topic string, message string) (err error)

func (*AsyncProducer) ProduceWithKey

func (this *AsyncProducer) ProduceWithKey(topic string, key string, message string) (err error)

type ConsumerConfig

type ConsumerConfig struct {
	KafkaUrl       string
	GroupId        string
	Topic          string
	MinBytes       int
	MaxBytes       int
	MaxWait        time.Duration
	TopicConfigMap map[string][]kafka.ConfigEntry
}

type FactoryType

type FactoryType struct{}

func (FactoryType) NewConsumer

func (FactoryType) NewConsumer(ctx context.Context, config util.Config, responseListener func(msg string) error, errorListener func(msg string) error) (err error)

func (FactoryType) NewProducer

func (FactoryType) NewProducer(ctx context.Context, config util.Config) (com.ProducerInterface, error)

type ProducerConfig

type ProducerConfig struct {
	AsyncFlushFrequency time.Duration
	AsyncCompression    sarama.CompressionCodec
	SyncCompression     sarama.CompressionCodec
	Sync                bool
	SyncIdempotent      bool
	PartitionNum        int
	ReplicationFactor   int
	AsyncFlushMessages  int
	TopicConfigMap      map[string][]kafka.ConfigEntry
}

type ProducerInterface

type ProducerInterface interface {
	Produce(topic string, message string) (err error)
	ProduceWithKey(topic string, key string, message string) (err error)
	Log(logger *log.Logger)
}

func PrepareProducer

func PrepareProducer(ctx context.Context, kafkaBootstrapUrl string, sync bool, syncIdempotent bool, partitionNum int, replicationFactor int) (result ProducerInterface, err error)

deprecated

func PrepareProducerWithConfig

func PrepareProducerWithConfig(ctx context.Context, kafkaBootstrapUrl string, config ProducerConfig) (result ProducerInterface, err error)

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

func (*SyncProducer) Log

func (this *SyncProducer) Log(logger *log.Logger)

func (*SyncProducer) Produce

func (this *SyncProducer) Produce(topic string, message string) (err error)

func (*SyncProducer) ProduceWithKey

func (this *SyncProducer) ProduceWithKey(topic string, key string, message string) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL