kafka

package
v0.0.0-...-e4caca2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BalancerStrategy

type BalancerStrategy int
const (
	MurMur2    BalancerStrategy = 0
	CRC32      BalancerStrategy = 1
	Hash       BalancerStrategy = 2
	LeastBytes BalancerStrategy = 3
	RoundRobin BalancerStrategy = 4
)

type Client

type Client struct {
	Brokers []string
	// contains filtered or unexported fields
}

func NewClient

func NewClient(servers string) *Client

func (*Client) CreateTopic

func (client *Client) CreateTopic(topic string, replica, partitions int) error

func (*Client) NewConsumer

func (client *Client) NewConsumer(topic string, configs ...*ConsumerConfig) *Consumer

func (*Client) NewProducer

func (client *Client) NewProducer(topic string, configs ...*ProducerConfig) *Producer

func (*Client) Partitions

func (client *Client) Partitions() ([]Partition, error)

func (*Client) Topics

func (client *Client) Topics() ([]string, error)

type Compression

type Compression = kafka.Compression

type Consumer

type Consumer struct {
	Config          *kafka.ReaderConfig
	Reader          *kafka.Reader
	MessageHandlers []ConsumerHandler
	ErrorHandler    func(err error) bool
	Context         context.Context
}

func NewConsumer

func NewConsumer(servers, topic string, configs ...*ConsumerConfig) *Consumer

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) OnError

func (c *Consumer) OnError(fn func(err error) bool)

func (*Consumer) OnMessage

func (c *Consumer) OnMessage(fn ConsumerHandler)

type ConsumerConfig

type ConsumerConfig struct {
	// contains filtered or unexported fields
}

func NewConsumerConfig

func NewConsumerConfig() *ConsumerConfig

func (*ConsumerConfig) Brokers

func (c *ConsumerConfig) Brokers(input []string) *ConsumerConfig

func (*ConsumerConfig) CommitInterval

func (c *ConsumerConfig) CommitInterval(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) GroupID

func (c *ConsumerConfig) GroupID(input string) *ConsumerConfig

func (*ConsumerConfig) GroupTopics

func (c *ConsumerConfig) GroupTopics(input []string) *ConsumerConfig

func (*ConsumerConfig) HeartbeatInterval

func (c *ConsumerConfig) HeartbeatInterval(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) JoinGroupBackoff

func (c *ConsumerConfig) JoinGroupBackoff(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) MaxAttempts

func (c *ConsumerConfig) MaxAttempts(input int) *ConsumerConfig

func (*ConsumerConfig) MaxBytes

func (c *ConsumerConfig) MaxBytes(input int) *ConsumerConfig

func (*ConsumerConfig) MaxWait

func (c *ConsumerConfig) MaxWait(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) MinBytes

func (c *ConsumerConfig) MinBytes(input int) *ConsumerConfig

func (*ConsumerConfig) Partition

func (c *ConsumerConfig) Partition(input int) *ConsumerConfig

func (*ConsumerConfig) PartitionWatchInterval

func (c *ConsumerConfig) PartitionWatchInterval(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) QueueCapacity

func (c *ConsumerConfig) QueueCapacity(input int) *ConsumerConfig

func (*ConsumerConfig) ReadBackoffMax

func (c *ConsumerConfig) ReadBackoffMax(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) ReadBackoffMin

func (c *ConsumerConfig) ReadBackoffMin(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) ReadLagInterval

func (c *ConsumerConfig) ReadLagInterval(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) RebalanceTimeout

func (c *ConsumerConfig) RebalanceTimeout(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) RetentionTime

func (c *ConsumerConfig) RetentionTime(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) SessionTimeout

func (c *ConsumerConfig) SessionTimeout(input time.Duration) *ConsumerConfig

func (*ConsumerConfig) StartOffset

func (c *ConsumerConfig) StartOffset(input int64) *ConsumerConfig

func (*ConsumerConfig) Topic

func (c *ConsumerConfig) Topic(input string) *ConsumerConfig

func (*ConsumerConfig) WatchPartitionChanges

func (c *ConsumerConfig) WatchPartitionChanges(input bool) *ConsumerConfig

type ConsumerHandler

type ConsumerHandler func(message Message)

type Message

type Message kafka.Message

type Partition

type Partition kafka.Partition

type Producer

type Producer struct {
	Writer  *kafka.Writer
	Context context.Context
}

func NewProducer

func NewProducer(servers, topic string, configs ...*ProducerConfig) *Producer

func (*Producer) Write

func (p *Producer) Write(messages ...Message) error

func (*Producer) WriteWithContext

func (p *Producer) WriteWithContext(ctx context.Context, messages ...Message) error

type ProducerConfig

type ProducerConfig struct {
	// contains filtered or unexported fields
}

func NewProducerConfig

func NewProducerConfig() *ProducerConfig

func (*ProducerConfig) Ack

func (*ProducerConfig) Async

func (c *ProducerConfig) Async(input bool) *ProducerConfig

func (*ProducerConfig) Balancer

func (c *ProducerConfig) Balancer(input BalancerStrategy) *ProducerConfig

func (*ProducerConfig) BatchBytes

func (c *ProducerConfig) BatchBytes(input int64) *ProducerConfig

func (*ProducerConfig) BatchSize

func (c *ProducerConfig) BatchSize(input int) *ProducerConfig

func (*ProducerConfig) BatchTimeout

func (c *ProducerConfig) BatchTimeout(input time.Duration) *ProducerConfig

func (*ProducerConfig) Compression

func (c *ProducerConfig) Compression(input Compression) *ProducerConfig

func (*ProducerConfig) MaxAttempts

func (c *ProducerConfig) MaxAttempts(input int) *ProducerConfig

func (*ProducerConfig) ReadTimeout

func (c *ProducerConfig) ReadTimeout(input time.Duration) *ProducerConfig

func (*ProducerConfig) WriteTimeout

func (c *ProducerConfig) WriteTimeout(input time.Duration) *ProducerConfig

type RequiredAcks

type RequiredAcks kafka.RequiredAcks
const (
	RequireNone RequiredAcks = 0
	RequireOne  RequiredAcks = 1
	RequireAll  RequiredAcks = -1
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL