Documentation ¶
Index ¶
- Variables
- type Consumer
- func (c *Consumer) Assign(partitions ...librdkafka.TopicPartition) (err error)
- func (c *Consumer) Close() (err error)
- func (c *Consumer) Read(timeout time.Duration) (msg *librdkafka.Message, err error)
- func (c *Consumer) ReadBytes(timeout time.Duration) (topic string, msg []byte, err error)
- func (c *Consumer) ReadString(timeout time.Duration) (topic, msg string, err error)
- func (c *Consumer) RunConsume(lockOsThread bool, timeout time.Duration, ...)
- func (c *Consumer) StopConsume()
- func (c *Consumer) Subscribe(topics []string, cb librdkafka.RebalanceCb) (err error)
- func (c *Consumer) UnAssign() (err error)
- func (c *Consumer) Unsubscribe() (err error)
- type Option
- type Producer
- func (p *Producer) Begin() (err error)
- func (p *Producer) Close(timeoutMs int)
- func (p *Producer) CloseIn()
- func (p *Producer) Commit(ctx context.Context) (err error)
- func (p *Producer) ErrorHandler(deliveryChan chan librdkafka.Event, ...)
- func (p *Producer) Flush(timeoutMs int)
- func (p *Producer) InitTrans(ctx context.Context) (err error)
- func (p *Producer) Produce(msg *librdkafka.Message, deliveryChan chan librdkafka.Event) (err error)
- func (p *Producer) ProduceBytes(topic *string, msg []byte, deliveryChan chan librdkafka.Event) (err error)
- func (p *Producer) ProduceString(topic *string, msg string, deliveryChan chan librdkafka.Event) (err error)
- func (p *Producer) Purge(flags int) (err error)
- func (p *Producer) Rollback(ctx context.Context) (err error)
- func (p *Producer) SuccessCount() uint64
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrLocalTimeout = errors.New("Local: Timed out") ErrProducerHasClosed = errors.New("Local: Producer Has Closed") ErrQueueFull = errors.New("Local: Queue full") )
View Source
var ( BuildMsg = func(topic *string, msg []byte) (message *librdkafka.Message) { message = msgGet() message.TopicPartition.Topic = topic message.TopicPartition.Partition = librdkafka.PartitionAny message.Value = msg message.Timestamp = time.Now() message.TimestampType = librdkafka.TimestampCreateTime return } )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func (*Consumer) Assign ¶
func (c *Consumer) Assign(partitions ...librdkafka.TopicPartition) (err error)
func (*Consumer) ReadString ¶
func (*Consumer) RunConsume ¶
func (*Consumer) StopConsume ¶
func (c *Consumer) StopConsume()
func (*Consumer) Subscribe ¶
func (c *Consumer) Subscribe(topics []string, cb librdkafka.RebalanceCb) (err error)
func (*Consumer) Unsubscribe ¶
type Option ¶
type Option struct {
Properties map[string]interface{} `yaml:"properties" json:"properties"`
}
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func (*Producer) ErrorHandler ¶
func (p *Producer) ErrorHandler(deliveryChan chan librdkafka.Event, handler func(msg *librdkafka.Message, err error))
func (*Producer) Produce ¶
func (p *Producer) Produce(msg *librdkafka.Message, deliveryChan chan librdkafka.Event) (err error)
func (*Producer) ProduceBytes ¶
func (*Producer) ProduceString ¶
func (*Producer) SuccessCount ¶
Click to show internal directories.
Click to hide internal directories.