kafka

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2019 License: Apache-2.0 Imports: 9 Imported by: 1

Documentation

Overview

Package kafka async producer Created by chenguolin 2019-04-20

Package kafka get metadata Created by chenguolin 2019-04-20

Package kafka consumer Created by chenguolin 2019-04-20

Package kafka sync producer Created by chenguolin 2019-04-20

Index

Constants

View Source
const (
	OffsetNewset = iota
	OffsetOldest
)

Offset define

Variables

View Source
var (
	ErrBrokersIsEmpty = errors.New("brokers is empty")
	ErrTopicIsEmpty   = errors.New("topic is empty")
	ErrGroupIDIsEmpty = errors.New("groupId is empty")
)

Error define

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

AsyncProducer async producer client AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automaticall when it passes out of scope.

func NewAsyncProducer

func NewAsyncProducer(brokers string) (*AsyncProducer, error)

NewAsyncProducer new Producer client

func (*AsyncProducer) Close

func (ap *AsyncProducer) Close() error

Close async producer

func (*AsyncProducer) Send

func (ap *AsyncProducer) Send(topic, key, message string)

Send message 2 kafka

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.

func NewClient

func NewClient(brokers string) (*Client, error)

NewClient new kafka client

func (*Client) CreatePartitions

func (c *Client) CreatePartitions(topic string, totalPartition int32) error

CreatePartitions Increase the number of partitions of the topics according to the corresponding values. If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected. It may take several seconds after this method returns success for all the brokers to become aware that the partitions have been created. During this time, ClusterAdmin#describeTopics may not return information about the new partitions. This operation is supported by brokers with version 1.0.0 or higher.

func (*Client) CreateTopic

func (c *Client) CreateTopic(topic string, partitions int32, replicationFactor int16) error

CreateTopic a new topic. This operation is supported by brokers with version 0.10.1.0 or higher. It may take several seconds after CreateTopic returns success for all the brokers to become aware that the topic has been created. During this time, listTopics may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.

func (*Client) DeleteRecord

func (c *Client) DeleteRecord(topic string, partitionID int32, endOffset int64) error

DeleteRecord whose offset is smaller than the given offset of the corresponding partition. This operation is supported by brokers with version 0.11.0.0 or higher.

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(topic string) error

DeleteTopic sends a delete topic request and returns delete topic response Delete a topic. It may take several seconds after the DeleteTopic to returns success and for all the brokers to become aware that the topics are gone. During this time, listTopics may continue to return information about the deleted topic. If delete.topic.enable is false on the brokers, deleteTopic will mark the topic for deletion, but not actually delete them.

func (*Client) DescribeConsumerGroup

func (c *Client) DescribeConsumerGroup(group string) (*sarama.GroupDescription, error)

DescribeConsumerGroup Describe the given consumer groups.

func (*Client) DescribeTopic

func (c *Client) DescribeTopic(topic string) ([]*sarama.PartitionMetadata, error)

DescribeTopic Describe topic in the cluster.

func (*Client) GetNewestOffset

func (c *Client) GetNewestOffset(topic string, partitionID int32) (int64, error)

GetNewestOffset get the newest offset

func (*Client) GetOldestOffset

func (c *Client) GetOldestOffset(topic string, partitionID int32) (int64, error)

GetOldestOffset get the oldest offset

func (*Client) ListConsumerGroups

func (c *Client) ListConsumerGroups() (map[string]string, error)

ListConsumerGroups List the consumer groups available in the cluster.

func (*Client) ListTopics

func (c *Client) ListTopics() ([]string, error)

ListTopics List the topics available in the cluster with the default options.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer client

func NewConsumer

func NewConsumer(brokers, topic, groupID string, defaultOffset int) (*Consumer, error)

NewConsumer new consumer

func (*Consumer) Close

func (c *Consumer) Close()

Close consumer

func (*Consumer) CommitOffset

func (c *Consumer) CommitOffset(msg *sarama.ConsumerMessage)

CommitOffset mark message as processed

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors return errors channel

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage

Messages return message channel

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

SyncProducer sync producer client SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.

The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.

func NewSyncProducer

func NewSyncProducer(brokers string) (*SyncProducer, error)

NewSyncProducer new Producer client

func (*SyncProducer) Close

func (sp *SyncProducer) Close() error

Close producer

func (*SyncProducer) Send

func (sp *SyncProducer) Send(topic, key, message string) (partitionID int32, offset int64, err error)

Send message 2 kafka

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL