kafkax

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2023 License: Apache-2.0 Imports: 6 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConsumer ...
	ErrConsumer = errors.New("consumer error")
	// ErrConsumerTopicsRequest ...
	ErrConsumerTopicsRequest = errors.New("topics request is not registered")
	// ErrConsumerTopicsIsNotSet ...
	ErrConsumerTopicsIsNotSet = errors.New("topics for consumer message is not set")
)
View Source
var (
	// ErrProducer ...
	ErrProducer = errors.New("producer error")
	// ErrProducerTopicIsNotSet ...
	ErrProducerTopicIsNotSet = errors.New("topic for producing message is not set")
)

Functions

This section is empty.

Types

type ClientConsumer

type ClientConsumer struct {
	*cgo.Consumer
}

ClientConsumer is the internal Kafka client consumer

type ClientConsumerAPI

type ClientConsumerAPI interface {
	Unsubscribe() error
	SubscribeTopics([]string, cgo.RebalanceCb) error
	Events() chan cgo.Event
	Assign([]cgo.TopicPartition) error
	Unassign() error
}

ClientConsumerAPI is the internal client consumer API

func GetConsumerClient

func GetConsumerClient(bootstrapServer string, groupID string, username string, password string, opts ...Option) (ClientConsumerAPI, error)

GetConsumerClient returns a ClientConsumerAPI

type ClientProducer

type ClientProducer struct {
	*cgo.Producer
}

ClientProducer is the internal Kafka client producer

type ClientProducerAPI

type ClientProducerAPI interface {
	Close()
	Events() chan cgo.Event
	Produce(*cgo.Message, chan cgo.Event) error
	Flush(int) int
}

ClientProducerAPI is the internal client producer API

func GetProducerClient

func GetProducerClient(bootstrapServer string, username string, password string, opts ...Option) (ClientProducerAPI, error)

GetProducerClient returns a ClientProducerAPI

type Consumer

type Consumer struct {
	ClientConsumerAPI
	// contains filtered or unexported fields
}

Consumer is the public Kafka client consumer

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context) (<-chan Event, error)

Consume consumer message for consumer

func (*Consumer) GetTopics

func (c *Consumer) GetTopics() []string

GetTopics return topics assigned to the consumer

func (*Consumer) SetTopics

func (c *Consumer) SetTopics(topics ...string) (ConsumerAPI, error)

SetTopics assign topics to the consumer

type ConsumerAPI

type ConsumerAPI interface {
	SetTopics(...string) (ConsumerAPI, error)
	GetTopics() []string
	Consume(context.Context) (<-chan Event, error)
}

ConsumerAPI is the public client consumer API

type Event

type Event struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Headers    []Header
	Timestamp  time.Time
}

Event is the Kafka message content

type Header struct {
	Key   string // Header name (utf-8 string)
	Value []byte // Header value (nil, empty, or binary)
}

Header Payload

type Kafka

type Kafka struct {
	BootstrapServer string `json:"boostrapserver" mapstructure:"boostrapserver"`
	Username        string `json:"username" mapstructure:"username"`
	Password        string `json:"password" mapstructure:"password"`
}

Kafka input structure

func (*Kafka) NewConsumer

func (cfg *Kafka) NewConsumer(groupID string, opts ...Option) (ConsumerAPI, error)

NewConsumer creates a ConsumerAPI

func (*Kafka) NewProducer

func (cfg *Kafka) NewProducer(opts ...Option) (ProducerAPI, error)

NewProducer creates a ProducerAPI

type MockClientConsumer

type MockClientConsumer struct {
	ClientConsumerAPI
	ErrUnsubscribe       error
	ErrSubscribeTopics   error
	ErrAssignPartition   error
	ErrUnAssignPartition error
	Event                cgo.Event
}

MockClientConsumer is the internal consumer mock client

func (*MockClientConsumer) Assign

func (c *MockClientConsumer) Assign(partitions []cgo.TopicPartition) (err error)

Assign is an internal mock method

func (*MockClientConsumer) Events

func (c *MockClientConsumer) Events() chan cgo.Event

Events is an internal mock method

func (*MockClientConsumer) SubscribeTopics

func (c *MockClientConsumer) SubscribeTopics(topics []string, rebalanceCb cgo.RebalanceCb) (err error)

SubscribeTopics is an internal mock method

func (*MockClientConsumer) Unassign

func (c *MockClientConsumer) Unassign() (err error)

Unassign is an internal mock method

func (*MockClientConsumer) Unsubscribe

func (c *MockClientConsumer) Unsubscribe() (err error)

Unsubscribe is an internal mock method

type MockClientProducer

type MockClientProducer struct {
	ClientProducerAPI
	ErrProduce error
	Event      cgo.Event
}

MockClientProducer is the internal producer mock client

func (*MockClientProducer) Close

func (p *MockClientProducer) Close()

Close is an internal mock method

func (*MockClientProducer) Events

func (p *MockClientProducer) Events() chan cgo.Event

Events is an internal mock method

func (*MockClientProducer) Flush

func (p *MockClientProducer) Flush(timeoutMs int) int

Flush is an internal mock method

func (*MockClientProducer) Produce

func (p *MockClientProducer) Produce(msg *cgo.Message, deliveryChan chan cgo.Event) (err error)

Produce is an internal mock method

type Option

type Option func(*Options)

Option ...

func Async

func Async(b bool) Option

Async option

func AuthKafka

func AuthKafka(b bool) Option

AuthKafka option

func ConfigMapKey

func ConfigMapKey(s string) Option

ConfigMapKey option

func Debugs

func Debugs(d []string) Option

Debugs option

func EarliestOffset

func EarliestOffset() Option

EarliestOffset Strategy

func FullStats

func FullStats(b bool) Option

FullStats option

func GroupInstanceID

func GroupInstanceID(s string) Option

GroupInstanceID option

func MaxBytes

func MaxBytes(b int) Option

MaxBytes option

func MaxPollInterval

func MaxPollInterval(t time.Duration) Option

MaxPollInterval option

func MaxWait

func MaxWait(t time.Duration) Option

MaxWait option

func Mechanisms

func Mechanisms(m string) Option

Mechanisms option

func MinBytes

func MinBytes(b int) Option

MinBytes option

func NumPartitions

func NumPartitions(n int) Option

NumPartitions option

func PartitionEOF added in v0.8.0

func PartitionEOF(enable bool) Option

PartitionEOF option

func Protocol

func Protocol(p string) Option

Protocol option

func Rebalance

func Rebalance(enable bool) Option

Rebalance option

func ReplicationFactor

func ReplicationFactor(r int) Option

ReplicationFactor option

func SessionTimeout

func SessionTimeout(t time.Duration) Option

SessionTimeout option

func StatInterval

func StatInterval(t time.Duration) Option

StatInterval option

func WithHeaders added in v0.9.4

func WithHeaders(headers []Header) Option

WithHeaders ...

func WriteTimeout

func WriteTimeout(t time.Duration) Option

WriteTimeout option

type Options

type Options struct {
	Protocol           string
	Mechanisms         string
	Async              bool
	SessionTimeout     time.Duration
	MaxPollInterval    time.Duration
	WriteTimeout       time.Duration
	ReadTimeout        time.Duration
	BatchTimeout       time.Duration
	MaxWait            time.Duration
	StatInterval       time.Duration
	NumPartitions      int
	ReplicationFactor  int
	MinBytes           int
	MaxBytes           int
	AuthKafka          bool
	FullStats          bool
	Debugs             []string
	GroupInstanceID    string
	ConfigMapKey       string
	AutoOffsetReset    string
	EnableRebalance    bool
	EnablePartitionEOF bool
	Headers            []Header
}

Options ...

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions ...

type Producer

type Producer struct {
	ClientProducerAPI
	// contains filtered or unexported fields
}

Producer is the public Kafka client producer

func (*Producer) GetTopic

func (p *Producer) GetTopic() string

GetTopic return topic assigned to the producer

func (*Producer) Produce

func (p *Producer) Produce(key string, value []byte, opts ...Option) error

Produce send message

func (*Producer) SetTopic

func (p *Producer) SetTopic(topic string) ProducerAPI

SetTopic assign topic to the producer

type ProducerAPI

type ProducerAPI interface {
	SetTopic(string) ProducerAPI
	GetTopic() string
	Produce(string, []byte, ...Option) error
}

ProducerAPI is the public client producer API

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL