kafkax

package
v0.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2021 License: Apache-2.0 Imports: 6 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumer               = errors.New("consumer error")
	ErrConsumerTopicsRequest  = errors.New("topics request is not registered")
	ErrConsumerTopicsIsNotSet = errors.New("topics for consumer message is not set")
)
View Source
var (
	ErrProducer              = errors.New("producer error")
	ErrProducerTopicIsNotSet = errors.New("topic for producing message is not set")
)

Functions

This section is empty.

Types

type ClientConsumer

type ClientConsumer struct {
	*cgo.Consumer
}

type ClientConsumerAPI

type ClientConsumerAPI interface {
	Unsubscribe() error
	SubscribeTopics([]string, cgo.RebalanceCb) error
	Events() chan cgo.Event
	Assign([]cgo.TopicPartition) error
	Unassign() error
}

func GetConsumerClient

func GetConsumerClient(bootstrapServer string, groupId string, username string, password string, opts ...Option) (ClientConsumerAPI, error)

type ClientProducer

type ClientProducer struct {
	*cgo.Producer
}

type ClientProducerAPI

type ClientProducerAPI interface {
	Close()
	Events() chan cgo.Event
	Produce(*cgo.Message, chan cgo.Event) error
	Flush(int) int
}

func GetProducerClient

func GetProducerClient(bootstrapServer string, username string, password string, opts ...Option) (ClientProducerAPI, error)

type Consumer

type Consumer struct {
	ClientConsumerAPI
	// contains filtered or unexported fields
}

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) (<-chan Event, error)

func (*Consumer) GetTopics

func (c *Consumer) GetTopics() []string

func (*Consumer) SetTopics

func (c *Consumer) SetTopics(topics ...string) (ConsumerAPI, error)

type ConsumerAPI

type ConsumerAPI interface {
	SetTopics(...string) (ConsumerAPI, error)
	GetTopics() []string
	Consume(context.Context) (<-chan Event, error)
}

type Event

type Event struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Headers    []Header
	Timestamp  time.Time
}

Client Event

type Header struct {
	Key   string // Header name (utf-8 string)
	Value []byte // Header value (nil, empty, or binary)
}

Header Payload

type Kafka

type Kafka struct {
	BootstrapServer string `json:"boostrapserver" mapstructure:"boostrapserver"`
	Username        string `json:"username" mapstructure:"username"`
	Password        string `json:"password" mapstructure:"password"`
	GroupId         string `json:"groupid" mapstructure:"groupid"`
}

func (*Kafka) NewConsumer

func (cfg *Kafka) NewConsumer(opts ...Option) (ConsumerAPI, error)

func (*Kafka) NewProducer

func (cfg *Kafka) NewProducer(opts ...Option) (ProducerAPI, error)

type MockClientConsumer

type MockClientConsumer struct {
	ClientConsumerAPI
	ErrUnsubscribe       error
	ErrSubscribeTopics   error
	ErrAssignPartition   error
	ErrUnAssignPartition error
	Event                cgo.Event
}

func (*MockClientConsumer) Assign

func (c *MockClientConsumer) Assign(partitions []cgo.TopicPartition) (err error)

func (*MockClientConsumer) Events

func (c *MockClientConsumer) Events() chan cgo.Event

func (*MockClientConsumer) SubscribeTopics

func (c *MockClientConsumer) SubscribeTopics(topics []string, rebalanceCb cgo.RebalanceCb) (err error)

func (*MockClientConsumer) Unassign

func (c *MockClientConsumer) Unassign() (err error)

func (*MockClientConsumer) Unsubscribe

func (c *MockClientConsumer) Unsubscribe() (err error)

type MockClientProducer

type MockClientProducer struct {
	ClientProducerAPI
	ErrProduce error
	Event      cgo.Event
}

func (*MockClientProducer) Close

func (p *MockClientProducer) Close()

func (*MockClientProducer) Events

func (p *MockClientProducer) Events() chan cgo.Event

func (*MockClientProducer) Flush

func (p *MockClientProducer) Flush(timeoutMs int) int

func (*MockClientProducer) Produce

func (p *MockClientProducer) Produce(msg *cgo.Message, deliveryChan chan cgo.Event) (err error)

type Option

type Option func(*Options)

Option

func Async

func Async(b bool) Option

Async option

func AuthKafka

func AuthKafka(b bool) Option

AuthKafka option

func ConfigMapKey

func ConfigMapKey(s string) Option

ConfigMapKey option

func Debugs

func Debugs(d []string) Option

Debugs option

func EarliestOffset

func EarliestOffset() Option

Earliest Offset Strategy ...

func FullStats

func FullStats(b bool) Option

FullStats option

func GroupInstanceID

func GroupInstanceID(s string) Option

GroupInstanceID option

func MaxBytes

func MaxBytes(b int) Option

MaxBytes option

func MaxPollInterval

func MaxPollInterval(t time.Duration) Option

MaxPollInterval option

func MaxWait

func MaxWait(t time.Duration) Option

MaxWait option

func Mechanisms

func Mechanisms(m string) Option

Mechanisms option

func MinBytes

func MinBytes(b int) Option

MinBytes option

func NumPartitions

func NumPartitions(n int) Option

NumPartitions option

func PartitionEof

func PartitionEof(enable bool) Option

PartitionEof option ...

func Protocol

func Protocol(p string) Option

Protocol option

func Rebalance

func Rebalance(enable bool) Option

Rebalance option ...

func ReplicationFactor

func ReplicationFactor(r int) Option

ReplicationFactor option

func SessionTimeout

func SessionTimeout(t time.Duration) Option

SessionTimeout option

func StatInterval

func StatInterval(t time.Duration) Option

StatInterval option

func WriteTimeout

func WriteTimeout(t time.Duration) Option

WriteTimeout option

type Options

type Options struct {
	Protocol           string
	Mechanisms         string
	Async              bool
	SessionTimeout     time.Duration
	MaxPollInterval    time.Duration
	WriteTimeout       time.Duration
	ReadTimeout        time.Duration
	BatchTimeout       time.Duration
	MaxWait            time.Duration
	StatInterval       time.Duration
	NumPartitions      int
	ReplicationFactor  int
	MinBytes           int
	MaxBytes           int
	AuthKafka          bool
	FullStats          bool
	Debugs             []string
	GroupInstanceID    string
	ConfigMapKey       string
	AutoOffsetReset    string
	EnableRebalance    bool
	EnablePartitionEof bool
}

Options ...

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions ...

type Producer

type Producer struct {
	ClientProducerAPI
	// contains filtered or unexported fields
}

func (*Producer) GetTopic

func (p *Producer) GetTopic() string

func (*Producer) Produce

func (p *Producer) Produce(key string, value []byte, opts ...Option) error

func (*Producer) SetTopic

func (p *Producer) SetTopic(topic string) ProducerAPI

type ProducerAPI

type ProducerAPI interface {
	SetTopic(string) ProducerAPI
	GetTopic() string
	Produce(string, []byte, ...Option) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL