kafka

package
v1.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2020 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ACK_BEFORE_AUTO    = 0
	ACK_AFTER_NOERROR  = 1
	ACK_AFTER_NOMATTER = 2
)

Variables

This section is empty.

Functions

func Close added in v1.1.19

func Close()

func SetupConsumer added in v1.1.19

func SetupConsumer(opt *ConsumerConfig) (err error)

func SetupProducer added in v1.1.19

func SetupProducer(opt *ProducerConfig) (err error)

Types

type ByteEncoder added in v1.1.19

type ByteEncoder = sarama.ByteEncoder

type Consumer added in v1.1.19

type Consumer interface {
	Close() error
	// blocking to consume the messages
	Consume(topics string, mh ConsumerMessageHandler, eh ConsumerErrorHandler) error
	ConsumeM(topics []string, mh ConsumerMessageHandler, eh ConsumerErrorHandler) error
}

func GetConsumer added in v1.1.19

func GetConsumer(key string) Consumer

type ConsumerConfig added in v1.1.19

type ConsumerConfig struct {
	Key          string
	Address      []string // kafka地址
	Group        string   // groupId
	Offset       int64
	Ack          int    // ack类型
	User         string //username and password for SASL/PLAIN  or SASL/SCRAM authentication
	Password     string
	DialTimeout  time.Duration // How long to wait for the initial connection.
	ReadTimeout  time.Duration // How long to wait for a response.
	WriteTimeout time.Duration // How long to wait for a transmit.
	KeepAlive    time.Duration
	Version      *sarama.KafkaVersion // kafka version
}

type ConsumerError added in v1.1.19

type ConsumerError = sarama.ConsumerError

type ConsumerErrorHandler added in v1.1.19

type ConsumerErrorHandler func(err error)

type ConsumerMessage added in v1.1.19

type ConsumerMessage = sarama.ConsumerMessage

type ConsumerMessageHandler added in v1.1.19

type ConsumerMessageHandler func(msg *ConsumerMessage) error

type Producer added in v1.1.19

type Producer interface {
	Close() error
	Produce(msgs ...*ProducerMessage) error
	AsyncHandle(mh ProducerMessageHandler, eh ProducerErrorHandler) // 必须设置 asyncReturnSuccess 或 asyncReturnError
}

func GetProducer added in v1.1.19

func GetProducer(key string) Producer

type ProducerConfig added in v1.1.19

type ProducerConfig struct {
	Key           string
	Address       []string // kafka地址
	Async         bool
	ReturnSuccess bool
	ReturnError   bool
	User          string //username and password for SASL/PLAIN  or SASL/SCRAM authentication
	Password      string
	Version       *sarama.KafkaVersion // kafka version
}

type ProducerError added in v1.1.19

type ProducerError = sarama.ProducerError

type ProducerErrorHandler added in v1.1.19

type ProducerErrorHandler func(err *ProducerError)

type ProducerMessage added in v1.1.19

type ProducerMessage = sarama.ProducerMessage

type ProducerMessageHandler added in v1.1.19

type ProducerMessageHandler func(msg *ProducerMessage)

type StringEncoder added in v1.1.19

type StringEncoder = sarama.StringEncoder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL