kafka

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2021 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrLocalTimeout      = errors.New("Local: Timed out")
	ErrProducerHasClosed = errors.New("Local: Producer Has Closed")
	ErrQueueFull         = errors.New("Local: Queue full")
)
View Source
var (
	BuildMsg = func(topic *string, msg []byte) (message *librdkafka.Message) {
		message = msgGet()
		message.TopicPartition.Topic = topic
		message.TopicPartition.Partition = librdkafka.PartitionAny
		message.Value = msg
		message.Timestamp = time.Now()
		message.TimestampType = librdkafka.TimestampCreateTime
		return
	}
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(option *Option) (consumer *Consumer, err error)

func (*Consumer) Assign

func (c *Consumer) Assign(partitions ...librdkafka.TopicPartition) (err error)

func (*Consumer) Close

func (c *Consumer) Close() (err error)

func (*Consumer) Read

func (c *Consumer) Read(timeout time.Duration) (msg *librdkafka.Message, err error)

func (*Consumer) ReadBytes

func (c *Consumer) ReadBytes(timeout time.Duration) (topic string, msg []byte, err error)

func (*Consumer) ReadString

func (c *Consumer) ReadString(timeout time.Duration) (topic, msg string, err error)

func (*Consumer) RunConsume

func (c *Consumer) RunConsume(lockOsThread bool, timeout time.Duration, handler func(topic string, msg []byte, err error))

func (*Consumer) StopConsume

func (c *Consumer) StopConsume()

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topics []string, cb librdkafka.RebalanceCb) (err error)

func (*Consumer) UnAssign

func (c *Consumer) UnAssign() (err error)

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe() (err error)

type Option

type Option struct {
	Properties map[string]interface{} `yaml:"properties" json:"properties"`
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(option *Option) (producer *Producer, err error)

func (*Producer) Begin

func (p *Producer) Begin() (err error)

func (*Producer) Close

func (p *Producer) Close(timeoutMs int)

func (*Producer) CloseIn

func (p *Producer) CloseIn()

func (*Producer) Commit

func (p *Producer) Commit(ctx context.Context) (err error)

func (*Producer) ErrorHandler

func (p *Producer) ErrorHandler(deliveryChan chan librdkafka.Event, handler func(msg *librdkafka.Message, err error))

func (*Producer) Flush

func (p *Producer) Flush(timeoutMs int)

func (*Producer) InitTrans

func (p *Producer) InitTrans(ctx context.Context) (err error)

func (*Producer) Produce

func (p *Producer) Produce(msg *librdkafka.Message, deliveryChan chan librdkafka.Event) (err error)

func (*Producer) ProduceBytes

func (p *Producer) ProduceBytes(topic *string, msg []byte, deliveryChan chan librdkafka.Event) (err error)

func (*Producer) ProduceString

func (p *Producer) ProduceString(topic *string, msg string, deliveryChan chan librdkafka.Event) (err error)

func (*Producer) Purge

func (p *Producer) Purge(flags int) (err error)

func (*Producer) Rollback

func (p *Producer) Rollback(ctx context.Context) (err error)

func (*Producer) SuccessCount

func (p *Producer) SuccessCount() uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL