Documentation
¶
Index ¶
- Variables
- type ClientConsumer
- type ClientConsumerAPI
- type ClientProducer
- type ClientProducerAPI
- type Consumer
- type ConsumerAPI
- type Event
- type Header
- type Kafka
- type MockClientConsumer
- func (c *MockClientConsumer) Assign(partitions []cgo.TopicPartition) (err error)
- func (c *MockClientConsumer) Events() chan cgo.Event
- func (c *MockClientConsumer) SubscribeTopics(topics []string, rebalanceCb cgo.RebalanceCb) (err error)
- func (c *MockClientConsumer) Unassign() (err error)
- func (c *MockClientConsumer) Unsubscribe() (err error)
- type MockClientProducer
- type Option
- func Async(b bool) Option
- func AuthKafka(b bool) Option
- func ConfigMapKey(s string) Option
- func Debugs(d []string) Option
- func EarliestOffset() Option
- func FullStats(b bool) Option
- func GroupInstanceID(s string) Option
- func MaxBytes(b int) Option
- func MaxPollInterval(t time.Duration) Option
- func MaxWait(t time.Duration) Option
- func Mechanisms(m string) Option
- func MinBytes(b int) Option
- func NumPartitions(n int) Option
- func PartitionEof(enable bool) Option
- func Protocol(p string) Option
- func Rebalance(enable bool) Option
- func ReplicationFactor(r int) Option
- func SessionTimeout(t time.Duration) Option
- func StatInterval(t time.Duration) Option
- func WriteTimeout(t time.Duration) Option
- type Options
- type Producer
- type ProducerAPI
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrConsumer = errors.New("consumer error") ErrConsumerTopicsRequest = errors.New("topics request is not registered") ErrConsumerTopicsIsNotSet = errors.New("topics for consumer message is not set") )
View Source
var ( ErrProducer = errors.New("producer error") ErrProducerTopicIsNotSet = errors.New("topic for producing message is not set") )
Functions ¶
This section is empty.
Types ¶
type ClientConsumer ¶
type ClientConsumerAPI ¶
type ClientConsumerAPI interface { Unsubscribe() error SubscribeTopics([]string, cgo.RebalanceCb) error Events() chan cgo.Event Assign([]cgo.TopicPartition) error Unassign() error }
func GetConsumerClient ¶
type ClientProducer ¶
type ClientProducerAPI ¶
type ClientProducerAPI interface { Close() Events() chan cgo.Event Produce(*cgo.Message, chan cgo.Event) error Flush(int) int }
func GetProducerClient ¶
type Consumer ¶
type Consumer struct { ClientConsumerAPI // contains filtered or unexported fields }
type ConsumerAPI ¶
type Event ¶
type Event struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Headers []Header
Timestamp time.Time
}
Client Event
type Header ¶
type Header struct { Key string // Header name (utf-8 string) Value []byte // Header value (nil, empty, or binary) }
Header Payload
type Kafka ¶
type Kafka struct { BootstrapServer string `json:"boostrapserver" mapstructure:"boostrapserver"` Username string `json:"username" mapstructure:"username"` Password string `json:"password" mapstructure:"password"` GroupId string `json:"groupid" mapstructure:"groupid"` }
func (*Kafka) NewConsumer ¶
func (cfg *Kafka) NewConsumer(opts ...Option) (ConsumerAPI, error)
func (*Kafka) NewProducer ¶
func (cfg *Kafka) NewProducer(opts ...Option) (ProducerAPI, error)
type MockClientConsumer ¶
type MockClientConsumer struct { ClientConsumerAPI ErrUnsubscribe error ErrSubscribeTopics error ErrAssignPartition error ErrUnAssignPartition error Event cgo.Event }
func (*MockClientConsumer) Assign ¶
func (c *MockClientConsumer) Assign(partitions []cgo.TopicPartition) (err error)
func (*MockClientConsumer) Events ¶
func (c *MockClientConsumer) Events() chan cgo.Event
func (*MockClientConsumer) SubscribeTopics ¶
func (c *MockClientConsumer) SubscribeTopics(topics []string, rebalanceCb cgo.RebalanceCb) (err error)
func (*MockClientConsumer) Unassign ¶
func (c *MockClientConsumer) Unassign() (err error)
func (*MockClientConsumer) Unsubscribe ¶
func (c *MockClientConsumer) Unsubscribe() (err error)
type MockClientProducer ¶
type MockClientProducer struct { ClientProducerAPI ErrProduce error Event cgo.Event }
func (*MockClientProducer) Close ¶
func (p *MockClientProducer) Close()
func (*MockClientProducer) Events ¶
func (p *MockClientProducer) Events() chan cgo.Event
func (*MockClientProducer) Flush ¶
func (p *MockClientProducer) Flush(timeoutMs int) int
type Options ¶
type Options struct { Protocol string Mechanisms string Async bool SessionTimeout time.Duration MaxPollInterval time.Duration WriteTimeout time.Duration ReadTimeout time.Duration BatchTimeout time.Duration MaxWait time.Duration StatInterval time.Duration NumPartitions int ReplicationFactor int MinBytes int MaxBytes int AuthKafka bool FullStats bool Debugs []string GroupInstanceID string ConfigMapKey string AutoOffsetReset string EnableRebalance bool EnablePartitionEof bool }
Options ...
type Producer ¶
type Producer struct { ClientProducerAPI // contains filtered or unexported fields }
func (*Producer) SetTopic ¶
func (p *Producer) SetTopic(topic string) ProducerAPI
type ProducerAPI ¶
Click to show internal directories.
Click to hide internal directories.