kafka

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MessageToProducerMessage

func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)

func NewServiceScript added in v0.5.7

func NewServiceScript(config interface{}) (res map[string]interface{}, err error)

Types

type BrokerInfo added in v0.1.0

type BrokerInfo struct {
	Id        int32  `json:"id"`
	Addr      string `json:"addr"`
	Rack      string `json:"rack"`
	Connected bool   `json:"connected"`
}

type Config

type Config struct {
	Address  string `json:"address"`
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
	CertPath string `json:"certPath,omitempty"`
}

Config kafka配置

type Group added in v0.2.1

type Group struct {
	GroupId string `json:"groupId"`
	Cluster string `json:"cluster"`
}

type GroupDescription added in v0.2.0

type GroupDescription struct {
	// Version defines the protocol version to use for encode and decode
	Version int16 `json:"version"`
	// Err contains the describe error as the KError type.
	Err sarama.KError `json:"err"`
	// ErrorCode contains the describe error, or 0 if there was no error.
	ErrorCode int16 `json:"errorCode"`
	// GroupId contains the group ID string.
	GroupId string `json:"groupId"`
	// State contains the group state string, or the empty string.
	State string `json:"state"`
	// ProtocolType contains the group protocol type, or the empty string.
	ProtocolType string `json:"protocolType"`
	// Protocol contains the group protocol data, or the empty string.
	Protocol string `json:"protocol"`
	// Members contains the group members.
	Members map[string]*GroupMemberDescription `json:"members"`
	// AuthorizedOperations contains a 32-bit bitfield to represent authorized
	// operations for this group.
	AuthorizedOperations int32 `json:"authorizedOperations"`
}

type GroupMemberDescription added in v0.2.0

type GroupMemberDescription struct {
	// Version defines the protocol version to use for encode and decode
	Version int16 `json:"version"`
	// MemberId contains the member ID assigned by the group coordinator.
	MemberId string `json:"memberId"`
	// GroupInstanceId contains the unique identifier of the consumer instance
	// provided by end user.
	GroupInstanceId *string `json:"groupInstanceId"`
	// ClientId contains the client ID used in the member's latest join group
	// request.
	ClientId string `json:"clientId"`
	// ClientHost contains the client host.
	ClientHost string `json:"clientHost"`
	// MemberMetadata contains the metadata corresponding to the current group
	// protocol in use.
	MemberMetadata []byte `json:"memberMetadata"`
	// MemberAssignment contains the current assignment provided by the group
	// leader.
	MemberAssignment []byte `json:"memberAssignment"`
}

type IService added in v0.0.5

type IService interface {
	// Close 关闭 kafka 客户端
	Close()
	// Info 查看 kafka 信息
	Info() (res *Info, err error)
	// GetTopics 获取主题
	GetTopics() (res []*TopicInfo, err error)
	// GetTopic 获取主题
	GetTopic(topic string, time int64) (res *TopicInfo, err error)
	// Pull 拉取消息
	Pull(groupId string, topics []string, PullSize int, PullTimeout int, keyType, valueType string) (msgList []*Message, err error)
	// MarkOffset 提交 位置
	MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)
	// ResetOffset 重置 位置
	ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)
	// CreatePartitions 创建 主题 分区
	CreatePartitions(topic string, count int32) (err error)
	// CreateTopic 创建主题
	CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)
	// DeleteTopic 删除 主题
	DeleteTopic(topic string) (err error)
	// DeleteConsumerGroup 删除 某个 消费组
	DeleteConsumerGroup(groupId string) (err error)
	// DeleteRecords 删除 主题 数据
	DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)
	// NewSyncProducer 创建 提供者
	NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
	// Push 推送
	Push(msg *Message) (err error)
	// GetOffset 获取 主题 某个 分区 最新 位置
	GetOffset(topic string, partitionID int32, time int64) (offset int64, err error)
	// Partitions 获取 主题 分区
	Partitions(topic string) (partitions []int32, err error)
	// ListConsumerGroups 查询 所有 消费组
	ListConsumerGroups() (res []*Group, err error)
	// DescribeConsumerGroups 查询 消费组 明细
	DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error)
	// DeleteConsumerGroupOffset 删除 消费组 某个主题 分区
	DeleteConsumerGroupOffset(group string, topic string, partition int32) (err error)
	// ListConsumerGroupOffsets 查询 消费组 主题分区 信息
	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (res *OffsetFetchResponse, err error)
	// RemoveMemberFromConsumerGroup 删除 消费组 成员
	RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error)
	// DescribeTopics 主题 元数据
	DescribeTopics(topics []string) (res []*TopicMetadata, err error)
	// GetClient 获取 kafka 客户端
	GetClient() (res sarama.Client, err error)
}

func New

func New(config *Config) (IService, error)

New 创建kafka服务

type Info added in v0.1.0

type Info struct {
	Brokers []*BrokerInfo `json:"brokers"`
}

type LeaveGroupResponse added in v0.2.0

type LeaveGroupResponse struct {
	Version      int16            `json:"version"`
	ThrottleTime int32            `json:"throttleTime"`
	Err          sarama.KError    `json:"err"`
	Members      []MemberResponse `json:"members"`
}

type MemberResponse added in v0.2.0

type MemberResponse struct {
	MemberId        string        `json:"memberId"`
	GroupInstanceId *string       `json:"groupInstanceId"`
	Err             sarama.KError `json:"err"`
}

type Message

type Message struct {
	KeyType   string          `json:"keyType,omitempty"`
	Key       string          `json:"key,omitempty"`
	ValueType string          `json:"valueType,omitempty"`
	Value     string          `json:"value,omitempty"`
	Topic     string          `json:"topic,omitempty"`
	Partition *int32          `json:"partition,omitempty"`
	Offset    *int64          `json:"offset,omitempty"`
	Headers   []MessageHeader `json:"headers,omitempty"`
	Timestamp *time.Time      `json:"timestamp,omitempty"`
}

func ConsumerMessageToMessage

func ConsumerMessageToMessage(keyType string, valueType string, consumerMessage *sarama.ConsumerMessage) (msg *Message, err error)

type MessageHeader

type MessageHeader struct {
	Key   string `json:"key,omitempty"`
	Value string `json:"value,omitempty"`
}

type OffsetFetchResponse added in v0.2.0

type OffsetFetchResponse struct {
	Version        int16                                          `json:"version"`
	ThrottleTimeMs int32                                          `json:"throttleTimeMs"`
	Blocks         map[string]map[int32]*OffsetFetchResponseBlock `json:"blocks"`
	Err            sarama.KError                                  `json:"err"`
}

type OffsetFetchResponseBlock added in v0.2.0

type OffsetFetchResponseBlock struct {
	Offset      int64         `json:"offset"`
	LeaderEpoch int32         `json:"leaderEpoch"`
	Metadata    string        `json:"metadata"`
	Err         sarama.KError `json:"err"`
}

type PartitionMetadata added in v0.2.4

type PartitionMetadata struct {
	// Version defines the protocol version to use for encode and decode
	Version int16 `json:"version"`
	// Err contains the partition error, or 0 if there was no error.
	Err sarama.KError `json:"err"`
	// ID contains the partition index.
	ID int32 `json:"ID"`
	// Leader contains the ID of the leader broker.
	Leader int32 `json:"leader"`
	// LeaderEpoch contains the leader epoch of this partition.
	LeaderEpoch int32 `json:"leaderEpoch"`
	// Replicas contains the set of all nodes that host this partition.
	Replicas []int32 `json:"replicas"`
	// Isr contains the set of nodes that are in sync with the leader for this partition.
	Isr []int32 `json:"isr"`
	// OfflineReplicas contains the set of offline replicas of this partition.
	OfflineReplicas []int32 `json:"offlineReplicas"`
}

type ProducerMessage

type ProducerMessage struct {
	*sarama.ProducerMessage
}

type Service added in v0.0.5

type Service struct {
	*Config
}

Service 注册处理器在线信息等

func (*Service) Close added in v0.5.7

func (this_ *Service) Close()

func (*Service) CreatePartitions added in v0.0.5

func (this_ *Service) CreatePartitions(topic string, count int32) (err error)

func (*Service) CreateTopic added in v0.0.5

func (this_ *Service) CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)

func (*Service) DeleteConsumerGroup added in v0.0.5

func (this_ *Service) DeleteConsumerGroup(groupId string) (err error)

func (*Service) DeleteConsumerGroupOffset added in v0.1.9

func (this_ *Service) DeleteConsumerGroupOffset(group string, topic string, partition int32) (err error)

func (*Service) DeleteRecords added in v0.0.5

func (this_ *Service) DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)

func (*Service) DeleteTopic added in v0.0.5

func (this_ *Service) DeleteTopic(topic string) (err error)

func (*Service) DescribeConsumerGroups added in v0.1.9

func (this_ *Service) DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error)

func (*Service) DescribeTopics added in v0.2.4

func (this_ *Service) DescribeTopics(topics []string) (res []*TopicMetadata, err error)

func (*Service) GetClient added in v0.1.9

func (this_ *Service) GetClient() (res sarama.Client, err error)

func (*Service) GetOffset added in v0.1.6

func (this_ *Service) GetOffset(topic string, partitionID int32, time int64) (offset int64, err error)

GetOffset 查询集群以获取 主题/分区组合上的给定时间(以毫秒为单位)。 对于最早的可用偏移,时间应该是OffsetOldest, OffsetNewest是下一次或某一时间将生成的消息的偏移量。

func (*Service) GetServers added in v0.0.5

func (this_ *Service) GetServers() []string

func (*Service) GetTopic added in v0.1.8

func (this_ *Service) GetTopic(topic string, time int64) (res *TopicInfo, err error)

func (*Service) GetTopics added in v0.0.5

func (this_ *Service) GetTopics() (res []*TopicInfo, err error)

func (*Service) Info added in v0.0.5

func (this_ *Service) Info() (res *Info, err error)

func (*Service) ListConsumerGroupOffsets added in v0.1.9

func (this_ *Service) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (res *OffsetFetchResponse, err error)

func (*Service) ListConsumerGroups added in v0.1.9

func (this_ *Service) ListConsumerGroups() (res []*Group, err error)

func (*Service) MarkOffset added in v0.0.5

func (this_ *Service) MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)

func (*Service) NewSyncProducer added in v0.0.5

func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)

NewSyncProducer 创建生产者

func (*Service) Partitions added in v0.1.6

func (this_ *Service) Partitions(topic string) (partitions []int32, err error)

func (*Service) Pull added in v0.0.5

func (this_ *Service) Pull(groupId string, topics []string, PullSize int, PullTimeout int, keyType, valueType string) (msgList []*Message, err error)

func (*Service) Push added in v0.0.5

func (this_ *Service) Push(msg *Message) (err error)

Push 推送消息到kafka

func (*Service) RemoveMemberFromConsumerGroup added in v0.2.0

func (this_ *Service) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error)

func (*Service) ResetOffset added in v0.0.5

func (this_ *Service) ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)

type TopicInfo

type TopicInfo struct {
	Topic      string            `json:"topic"`
	Partitions []*TopicPartition `json:"partitions"`
}

type TopicMetadata added in v0.2.4

type TopicMetadata struct {
	// Version defines the protocol version to use for encode and decode
	Version int16 `json:"version"`
	// Err contains the topic error, or 0 if there was no error.
	Err sarama.KError `json:"err"`
	// Name contains the topic name.
	Name string `json:"name"`
	// IsInternal contains a True if the topic is internal.
	IsInternal bool `json:"isInternal"`
	// Partitions contains each partition in the topic.
	Partitions []*PartitionMetadata `json:"partitions"`
}

type TopicPartition added in v0.1.6

type TopicPartition struct {
	Partition int32   `json:"partition"`
	Offset    int64   `json:"offset"`
	Replicas  []int32 `json:"replicas"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL