Documentation ¶
Index ¶
- Variables
- type ClientConsumer
- type ClientConsumerAPI
- type ClientProducer
- type ClientProducerAPI
- type Consumer
- type ConsumerAPI
- type Event
- type Header
- type Kafka
- type MockClientConsumer
- func (c *MockClientConsumer) Assign(partitions []cgo.TopicPartition) (err error)
- func (c *MockClientConsumer) Events() chan cgo.Event
- func (c *MockClientConsumer) SubscribeTopics(topics []string, rebalanceCb cgo.RebalanceCb) (err error)
- func (c *MockClientConsumer) Unassign() (err error)
- func (c *MockClientConsumer) Unsubscribe() (err error)
- type MockClientProducer
- type Option
- func Async(b bool) Option
- func AuthKafka(b bool) Option
- func ConfigMapKey(s string) Option
- func Debugs(d []string) Option
- func EarliestOffset() Option
- func FullStats(b bool) Option
- func GroupInstanceID(s string) Option
- func MaxBytes(b int) Option
- func MaxPollInterval(t time.Duration) Option
- func MaxWait(t time.Duration) Option
- func Mechanisms(m string) Option
- func MinBytes(b int) Option
- func NumPartitions(n int) Option
- func PartitionEOF(enable bool) Option
- func Protocol(p string) Option
- func Rebalance(enable bool) Option
- func ReplicationFactor(r int) Option
- func SessionTimeout(t time.Duration) Option
- func StatInterval(t time.Duration) Option
- func WithHeaders(headers []Header) Option
- func WriteTimeout(t time.Duration) Option
- type Options
- type Producer
- type ProducerAPI
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConsumer ... ErrConsumer = errors.New("consumer error") // ErrConsumerTopicsRequest ... ErrConsumerTopicsRequest = errors.New("topics request is not registered") // ErrConsumerTopicsIsNotSet ... ErrConsumerTopicsIsNotSet = errors.New("topics for consumer message is not set") )
var ( // ErrProducer ... ErrProducer = errors.New("producer error") // ErrProducerTopicIsNotSet ... ErrProducerTopicIsNotSet = errors.New("topic for producing message is not set") )
Functions ¶
This section is empty.
Types ¶
type ClientConsumer ¶
ClientConsumer is the internal Kafka client consumer
type ClientConsumerAPI ¶
type ClientConsumerAPI interface { Unsubscribe() error SubscribeTopics([]string, cgo.RebalanceCb) error Events() chan cgo.Event Assign([]cgo.TopicPartition) error Unassign() error }
ClientConsumerAPI is the internal client consumer API
func GetConsumerClient ¶
func GetConsumerClient(bootstrapServer string, groupID string, username string, password string, opts ...Option) (ClientConsumerAPI, error)
GetConsumerClient returns a ClientConsumerAPI
type ClientProducer ¶
ClientProducer is the internal Kafka client producer
type ClientProducerAPI ¶
type ClientProducerAPI interface { Close() Events() chan cgo.Event Produce(*cgo.Message, chan cgo.Event) error Flush(int) int }
ClientProducerAPI is the internal client producer API
func GetProducerClient ¶
func GetProducerClient(bootstrapServer string, username string, password string, opts ...Option) (ClientProducerAPI, error)
GetProducerClient returns a ClientProducerAPI
type Consumer ¶
type Consumer struct { ClientConsumerAPI // contains filtered or unexported fields }
Consumer is the public Kafka client consumer
type ConsumerAPI ¶
type ConsumerAPI interface { SetTopics(...string) (ConsumerAPI, error) GetTopics() []string Consume(context.Context) (<-chan Event, error) }
ConsumerAPI is the public client consumer API
type Event ¶
type Event struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Headers []Header
Timestamp time.Time
}
Event is the Kafka message content
type Header ¶
type Header struct { Key string // Header name (utf-8 string) Value []byte // Header value (nil, empty, or binary) }
Header Payload
type Kafka ¶
type Kafka struct { BootstrapServer string `json:"boostrapserver" mapstructure:"boostrapserver"` Username string `json:"username" mapstructure:"username"` Password string `json:"password" mapstructure:"password"` }
Kafka input structure
func (*Kafka) NewConsumer ¶
func (cfg *Kafka) NewConsumer(groupID string, opts ...Option) (ConsumerAPI, error)
NewConsumer creates a ConsumerAPI
func (*Kafka) NewProducer ¶
func (cfg *Kafka) NewProducer(opts ...Option) (ProducerAPI, error)
NewProducer creates a ProducerAPI
type MockClientConsumer ¶
type MockClientConsumer struct { ClientConsumerAPI ErrUnsubscribe error ErrSubscribeTopics error ErrAssignPartition error ErrUnAssignPartition error Event cgo.Event }
MockClientConsumer is the internal consumer mock client
func (*MockClientConsumer) Assign ¶
func (c *MockClientConsumer) Assign(partitions []cgo.TopicPartition) (err error)
Assign is an internal mock method
func (*MockClientConsumer) Events ¶
func (c *MockClientConsumer) Events() chan cgo.Event
Events is an internal mock method
func (*MockClientConsumer) SubscribeTopics ¶
func (c *MockClientConsumer) SubscribeTopics(topics []string, rebalanceCb cgo.RebalanceCb) (err error)
SubscribeTopics is an internal mock method
func (*MockClientConsumer) Unassign ¶
func (c *MockClientConsumer) Unassign() (err error)
Unassign is an internal mock method
func (*MockClientConsumer) Unsubscribe ¶
func (c *MockClientConsumer) Unsubscribe() (err error)
Unsubscribe is an internal mock method
type MockClientProducer ¶
type MockClientProducer struct { ClientProducerAPI ErrProduce error Event cgo.Event }
MockClientProducer is the internal producer mock client
func (*MockClientProducer) Close ¶
func (p *MockClientProducer) Close()
Close is an internal mock method
func (*MockClientProducer) Events ¶
func (p *MockClientProducer) Events() chan cgo.Event
Events is an internal mock method
func (*MockClientProducer) Flush ¶
func (p *MockClientProducer) Flush(timeoutMs int) int
Flush is an internal mock method
type Options ¶
type Options struct { Protocol string Mechanisms string Async bool SessionTimeout time.Duration MaxPollInterval time.Duration WriteTimeout time.Duration ReadTimeout time.Duration BatchTimeout time.Duration MaxWait time.Duration StatInterval time.Duration NumPartitions int ReplicationFactor int MinBytes int MaxBytes int AuthKafka bool FullStats bool Debugs []string GroupInstanceID string ConfigMapKey string AutoOffsetReset string EnableRebalance bool EnablePartitionEOF bool Headers []Header }
Options ...
type Producer ¶
type Producer struct { ClientProducerAPI // contains filtered or unexported fields }
Producer is the public Kafka client producer
func (*Producer) SetTopic ¶
func (p *Producer) SetTopic(topic string) ProducerAPI
SetTopic assign topic to the producer
type ProducerAPI ¶
type ProducerAPI interface { SetTopic(string) ProducerAPI GetTopic() string Produce(string, []byte, ...Option) error }
ProducerAPI is the public client producer API