kafka

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2020 License: GPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KafkaReconnectDuration        = 1
	KafkaQueStatusRefreshDuration = 60
)

Constants

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Topic   string
	GroupID string
}

Config kafka config

func (*Config) Equals

func (me *Config) Equals(to *Config) bool

Equals check

type ConsumerCallback

type ConsumerCallback func(ConsumerMessage) *mqenv.MQPublishMessage

ConsumerCallback callback

type ConsumerMessage

type ConsumerMessage struct {
	Topic     string
	Partition int
	Key       []byte
	Value     []byte
	Offset    int64
	Headers   map[string][]byte
	Time      time.Time
}

ConsumerMessage struct

type ConsumerProxy

type ConsumerProxy struct {
	Topic       string
	Callback    ConsumerCallback
	ConsumerTag string
	AutoAck     bool
	Exclusive   bool
	NoLocal     bool
	NoWait      bool
}

ConsumerProxy kafka

func GenerateKafkaConsumerProxy

func GenerateKafkaConsumerProxy(consumeProxy *mqenv.MQConsumerProxy) *ConsumerProxy

GenerateKafkaConsumerProxy geenrate kafak consumer proxy

type InstStats

type InstStats struct {
	Bytes         int64  `json:"bytes"`
	Dials         int64  `json:"connections"`
	Topic         string `json:"topic"`
	Messages      int64  `json:"messages"`
	Rebalances    int64  `json:"rebalances"`
	Errors        int64  `json:"errors"`
	Timeouts      int64  `json:"timeouts"`
	ClientID      string `json:"clientID"`
	QueueLength   int64  `json:"queueLength"`
	QueueCapacity int64  `json:"queueCapacity"`
}

InstStats stats

type Kafka

type Kafka struct {
	Name       string
	Publish    chan *PublishingMsg
	Consume    chan *ConsumerProxy
	Done       chan error
	Reader     *kafka.Reader
	Writer     *kafka.Writer
	Config     *Config
	ConnConfig *mqenv.MQConnectorConfig
	Close      chan interface{}
	// contains filtered or unexported fields
}

Kafka instance

func GetKafka

func GetKafka(name string) (*Kafka, error)

GetKafka get instance

func InitKafka

func InitKafka(mqConnName string, connCfg *mqenv.MQConnectorConfig, kafkaCfg *Config) (*Kafka, error)

InitKafka init kafak

func (*Kafka) Run

func (r *Kafka) Run()

Run start

func (*Kafka) Stats

func (r *Kafka) Stats() Stats

Stats stats

type PublishingMsg

type PublishingMsg struct {
	Body          []byte
	Key           []byte
	Topic         string
	Partition     int
	Offset        int64
	Headers       map[string][]byte
	PublishStatus chan mqenv.MQEvent `json:"-"`
	EventLabel    string             `json:"eventLabel"`
}

PublishingMsg publishing msg

func GenerateKafkaPublishMessage

func GenerateKafkaPublishMessage(publishMsg *mqenv.MQPublishMessage, topic string) *PublishingMsg

GenerateKafkaPublishMessage generate publish message

type Stats

type Stats struct {
	Consumer InstStats `json:"consumer"`
	Producer InstStats `json:"producer"`
}

Stats struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL