gkafka

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2019 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package gkafka provides producer and consumer client for kafka server.

Kafka客户端.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	Config *Config
	// contains filtered or unexported fields
}

Kafka Client(Consumer/SyncProducer/AsyncProducer)

func NewClient

func NewClient(config *Config) *Client

New a kafka client.

func (*Client) AsyncSend

func (client *Client) AsyncSend(message *Message) error

Send data to kafka in asynchronized way(concurrent safe).

func (*Client) Close

func (client *Client) Close()

Close client.

func (*Client) MarkOffset

func (client *Client) MarkOffset(topic string, partition int, offset int, metadata ...string) error

标记指定topic 分区开始读取位置

func (*Client) Receive

func (client *Client) Receive() (*Message, error)

Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.

func (*Client) SyncSend

func (client *Client) SyncSend(message *Message) error

Send data to kafka in synchronized way.

func (*Client) Topics

func (client *Client) Topics() ([]string, error)

Get all topics from kafka server. 这里创建独立的消费客户端获取topics,获取完之后销毁该客户端对象。

type Config

type Config struct {
	GroupId        string // group id for consumer.
	Servers        string // server list, multiple servers joined by ','.
	Topics         string // topic list, multiple topics joined by ','.
	AutoMarkOffset bool   // auto mark message read after consumer message from server
	sarama.Config
}

kafka Client based on sarama.Config

func NewConfig

func NewConfig() *Config

New a default configuration object.

type Message

type Message struct {
	Value     []byte
	Key       []byte
	Topic     string
	Partition int
	Offset    int
	// contains filtered or unexported fields
}

Kafka Message.

func (*Message) MarkOffset

func (msg *Message) MarkOffset()

自动标记已读取

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL