Documentation ¶
Overview ¶
Package kafka async producer Created by chenguolin 2019-04-20
Package kafka get metadata Created by chenguolin 2019-04-20
Package kafka consumer Created by chenguolin 2019-04-20
Package kafka sync producer Created by chenguolin 2019-04-20
Index ¶
- Constants
- Variables
- type AsyncProducer
- type Client
- func (c *Client) CreatePartitions(topic string, totalPartition int32) error
- func (c *Client) CreateTopic(topic string, partitions int32, replicationFactor int16) error
- func (c *Client) DeleteRecord(topic string, partitionID int32, endOffset int64) error
- func (c *Client) DeleteTopic(topic string) error
- func (c *Client) DescribeConsumerGroup(group string) (*sarama.GroupDescription, error)
- func (c *Client) DescribeTopic(topic string) ([]*sarama.PartitionMetadata, error)
- func (c *Client) GetNewestOffset(topic string, partitionID int32) (int64, error)
- func (c *Client) GetOldestOffset(topic string, partitionID int32) (int64, error)
- func (c *Client) ListConsumerGroups() (map[string]string, error)
- func (c *Client) ListTopics() ([]string, error)
- type Consumer
- type SyncProducer
Constants ¶
const ( OffsetNewset = iota OffsetOldest )
Offset define
Variables ¶
var ( ErrBrokersIsEmpty = errors.New("brokers is empty") ErrTopicIsEmpty = errors.New("topic is empty") ErrGroupIDIsEmpty = errors.New("groupId is empty") )
Error define
Functions ¶
This section is empty.
Types ¶
type AsyncProducer ¶
type AsyncProducer struct {
// contains filtered or unexported fields
}
AsyncProducer async producer client AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automaticall when it passes out of scope.
func NewAsyncProducer ¶
func NewAsyncProducer(brokers string) (*AsyncProducer, error)
NewAsyncProducer new Producer client
func (*AsyncProducer) Send ¶
func (ap *AsyncProducer) Send(topic, key, message string)
Send message 2 kafka
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.
func (*Client) CreatePartitions ¶
CreatePartitions Increase the number of partitions of the topics according to the corresponding values. If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected. It may take several seconds after this method returns success for all the brokers to become aware that the partitions have been created. During this time, ClusterAdmin#describeTopics may not return information about the new partitions. This operation is supported by brokers with version 1.0.0 or higher.
func (*Client) CreateTopic ¶
CreateTopic a new topic. This operation is supported by brokers with version 0.10.1.0 or higher. It may take several seconds after CreateTopic returns success for all the brokers to become aware that the topic has been created. During this time, listTopics may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
func (*Client) DeleteRecord ¶
DeleteRecord whose offset is smaller than the given offset of the corresponding partition. This operation is supported by brokers with version 0.11.0.0 or higher.
func (*Client) DeleteTopic ¶
DeleteTopic sends a delete topic request and returns delete topic response Delete a topic. It may take several seconds after the DeleteTopic to returns success and for all the brokers to become aware that the topics are gone. During this time, listTopics may continue to return information about the deleted topic. If delete.topic.enable is false on the brokers, deleteTopic will mark the topic for deletion, but not actually delete them.
func (*Client) DescribeConsumerGroup ¶
func (c *Client) DescribeConsumerGroup(group string) (*sarama.GroupDescription, error)
DescribeConsumerGroup Describe the given consumer groups.
func (*Client) DescribeTopic ¶
func (c *Client) DescribeTopic(topic string) ([]*sarama.PartitionMetadata, error)
DescribeTopic Describe topic in the cluster.
func (*Client) GetNewestOffset ¶
GetNewestOffset get the newest offset
func (*Client) GetOldestOffset ¶
GetOldestOffset get the oldest offset
func (*Client) ListConsumerGroups ¶
ListConsumerGroups List the consumer groups available in the cluster.
func (*Client) ListTopics ¶
ListTopics List the topics available in the cluster with the default options.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer client
func NewConsumer ¶
NewConsumer new consumer
func (*Consumer) CommitOffset ¶
func (c *Consumer) CommitOffset(msg *sarama.ConsumerMessage)
CommitOffset mark message as processed
func (*Consumer) Messages ¶
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage
Messages return message channel
type SyncProducer ¶
type SyncProducer struct {
// contains filtered or unexported fields
}
SyncProducer sync producer client SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.
func NewSyncProducer ¶
func NewSyncProducer(brokers string) (*SyncProducer, error)
NewSyncProducer new Producer client