Documentation ¶
Index ¶
- func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)
- func NewServiceScript(config interface{}) (res map[string]interface{}, err error)
- type BrokerInfo
- type Config
- type Group
- type GroupDescription
- type GroupMemberDescription
- type IService
- type Info
- type LeaveGroupResponse
- type MemberResponse
- type Message
- type MessageHeader
- type OffsetFetchResponse
- type OffsetFetchResponseBlock
- type PartitionMetadata
- type ProducerMessage
- type Service
- func (this_ *Service) Close()
- func (this_ *Service) CreatePartitions(topic string, count int32) (err error)
- func (this_ *Service) CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)
- func (this_ *Service) DeleteConsumerGroup(groupId string) (err error)
- func (this_ *Service) DeleteConsumerGroupOffset(group string, topic string, partition int32) (err error)
- func (this_ *Service) DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)
- func (this_ *Service) DeleteTopic(topic string) (err error)
- func (this_ *Service) DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error)
- func (this_ *Service) DescribeTopics(topics []string) (res []*TopicMetadata, err error)
- func (this_ *Service) GetClient() (res sarama.Client, err error)
- func (this_ *Service) GetOffset(topic string, partitionID int32, time int64) (offset int64, err error)
- func (this_ *Service) GetServers() []string
- func (this_ *Service) GetTopic(topic string, time int64) (res *TopicInfo, err error)
- func (this_ *Service) GetTopics() (res []*TopicInfo, err error)
- func (this_ *Service) Info() (res *Info, err error)
- func (this_ *Service) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (res *OffsetFetchResponse, err error)
- func (this_ *Service) ListConsumerGroups() (res []*Group, err error)
- func (this_ *Service) MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)
- func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
- func (this_ *Service) Partitions(topic string) (partitions []int32, err error)
- func (this_ *Service) Pull(groupId string, topics []string, PullSize int, PullTimeout int, ...) (msgList []*Message, err error)
- func (this_ *Service) Push(msg *Message) (err error)
- func (this_ *Service) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error)
- func (this_ *Service) ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)
- type TopicInfo
- type TopicMetadata
- type TopicPartition
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MessageToProducerMessage ¶
func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)
func NewServiceScript ¶ added in v0.5.7
Types ¶
type BrokerInfo ¶ added in v0.1.0
type Config ¶
type Config struct { Address string `json:"address"` Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` CertPath string `json:"certPath,omitempty"` }
Config kafka配置
type GroupDescription ¶ added in v0.2.0
type GroupDescription struct { // Version defines the protocol version to use for encode and decode Version int16 `json:"version"` // Err contains the describe error as the KError type. Err sarama.KError `json:"err"` // ErrorCode contains the describe error, or 0 if there was no error. ErrorCode int16 `json:"errorCode"` // GroupId contains the group ID string. GroupId string `json:"groupId"` // State contains the group state string, or the empty string. State string `json:"state"` // ProtocolType contains the group protocol type, or the empty string. ProtocolType string `json:"protocolType"` // Protocol contains the group protocol data, or the empty string. Protocol string `json:"protocol"` // Members contains the group members. Members map[string]*GroupMemberDescription `json:"members"` // AuthorizedOperations contains a 32-bit bitfield to represent authorized // operations for this group. AuthorizedOperations int32 `json:"authorizedOperations"` }
type GroupMemberDescription ¶ added in v0.2.0
type GroupMemberDescription struct { // Version defines the protocol version to use for encode and decode Version int16 `json:"version"` // MemberId contains the member ID assigned by the group coordinator. MemberId string `json:"memberId"` // GroupInstanceId contains the unique identifier of the consumer instance // provided by end user. GroupInstanceId *string `json:"groupInstanceId"` // ClientId contains the client ID used in the member's latest join group // request. ClientId string `json:"clientId"` // ClientHost contains the client host. ClientHost string `json:"clientHost"` // MemberMetadata contains the metadata corresponding to the current group // protocol in use. MemberMetadata []byte `json:"memberMetadata"` // MemberAssignment contains the current assignment provided by the group // leader. MemberAssignment []byte `json:"memberAssignment"` }
type IService ¶ added in v0.0.5
type IService interface { // Close 关闭 kafka 客户端 Close() // Info 查看 kafka 信息 Info() (res *Info, err error) // GetTopics 获取主题 GetTopics() (res []*TopicInfo, err error) // GetTopic 获取主题 GetTopic(topic string, time int64) (res *TopicInfo, err error) // Pull 拉取消息 Pull(groupId string, topics []string, PullSize int, PullTimeout int, keyType, valueType string) (msgList []*Message, err error) // MarkOffset 提交 位置 MarkOffset(groupId string, topic string, partition int32, offset int64) (err error) // ResetOffset 重置 位置 ResetOffset(groupId string, topic string, partition int32, offset int64) (err error) // CreatePartitions 创建 主题 分区 CreatePartitions(topic string, count int32) (err error) // CreateTopic 创建主题 CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error) // DeleteTopic 删除 主题 DeleteTopic(topic string) (err error) // DeleteConsumerGroup 删除 某个 消费组 DeleteConsumerGroup(groupId string) (err error) // DeleteRecords 删除 主题 数据 DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error) // NewSyncProducer 创建 提供者 NewSyncProducer() (syncProducer sarama.SyncProducer, err error) // Push 推送 Push(msg *Message) (err error) // GetOffset 获取 主题 某个 分区 最新 位置 GetOffset(topic string, partitionID int32, time int64) (offset int64, err error) // Partitions 获取 主题 分区 Partitions(topic string) (partitions []int32, err error) // ListConsumerGroups 查询 所有 消费组 ListConsumerGroups() (res []*Group, err error) // DescribeConsumerGroups 查询 消费组 明细 DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error) // DeleteConsumerGroupOffset 删除 消费组 某个主题 分区 DeleteConsumerGroupOffset(group string, topic string, partition int32) (err error) // ListConsumerGroupOffsets 查询 消费组 主题分区 信息 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (res *OffsetFetchResponse, err error) // RemoveMemberFromConsumerGroup 删除 消费组 成员 RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error) // DescribeTopics 主题 元数据 DescribeTopics(topics []string) (res []*TopicMetadata, err error) // GetClient 获取 kafka 客户端 GetClient() (res sarama.Client, err error) }
type Info ¶ added in v0.1.0
type Info struct {
Brokers []*BrokerInfo `json:"brokers"`
}
type LeaveGroupResponse ¶ added in v0.2.0
type LeaveGroupResponse struct { Version int16 `json:"version"` ThrottleTime int32 `json:"throttleTime"` Err sarama.KError `json:"err"` Members []MemberResponse `json:"members"` }
type MemberResponse ¶ added in v0.2.0
type Message ¶
type Message struct { KeyType string `json:"keyType,omitempty"` Key string `json:"key,omitempty"` ValueType string `json:"valueType,omitempty"` Value string `json:"value,omitempty"` Topic string `json:"topic,omitempty"` Partition *int32 `json:"partition,omitempty"` Offset *int64 `json:"offset,omitempty"` Headers []MessageHeader `json:"headers,omitempty"` Timestamp *time.Time `json:"timestamp,omitempty"` }
type MessageHeader ¶
type OffsetFetchResponse ¶ added in v0.2.0
type OffsetFetchResponseBlock ¶ added in v0.2.0
type PartitionMetadata ¶ added in v0.2.4
type PartitionMetadata struct { // Version defines the protocol version to use for encode and decode Version int16 `json:"version"` // Err contains the partition error, or 0 if there was no error. Err sarama.KError `json:"err"` // ID contains the partition index. ID int32 `json:"ID"` // Leader contains the ID of the leader broker. Leader int32 `json:"leader"` // LeaderEpoch contains the leader epoch of this partition. LeaderEpoch int32 `json:"leaderEpoch"` // Replicas contains the set of all nodes that host this partition. Replicas []int32 `json:"replicas"` // Isr contains the set of nodes that are in sync with the leader for this partition. Isr []int32 `json:"isr"` // OfflineReplicas contains the set of offline replicas of this partition. OfflineReplicas []int32 `json:"offlineReplicas"` }
type ProducerMessage ¶
type ProducerMessage struct {
*sarama.ProducerMessage
}
type Service ¶ added in v0.0.5
type Service struct {
*Config
}
Service 注册处理器在线信息等
func (*Service) CreatePartitions ¶ added in v0.0.5
func (*Service) CreateTopic ¶ added in v0.0.5
func (*Service) DeleteConsumerGroup ¶ added in v0.0.5
func (*Service) DeleteConsumerGroupOffset ¶ added in v0.1.9
func (*Service) DeleteRecords ¶ added in v0.0.5
func (*Service) DeleteTopic ¶ added in v0.0.5
func (*Service) DescribeConsumerGroups ¶ added in v0.1.9
func (this_ *Service) DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error)
func (*Service) DescribeTopics ¶ added in v0.2.4
func (this_ *Service) DescribeTopics(topics []string) (res []*TopicMetadata, err error)
func (*Service) GetOffset ¶ added in v0.1.6
func (this_ *Service) GetOffset(topic string, partitionID int32, time int64) (offset int64, err error)
GetOffset 查询集群以获取 主题/分区组合上的给定时间(以毫秒为单位)。 对于最早的可用偏移,时间应该是OffsetOldest, OffsetNewest是下一次或某一时间将生成的消息的偏移量。
func (*Service) GetServers ¶ added in v0.0.5
func (*Service) ListConsumerGroupOffsets ¶ added in v0.1.9
func (*Service) ListConsumerGroups ¶ added in v0.1.9
func (*Service) MarkOffset ¶ added in v0.0.5
func (*Service) NewSyncProducer ¶ added in v0.0.5
func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
NewSyncProducer 创建生产者
func (*Service) Partitions ¶ added in v0.1.6
func (*Service) RemoveMemberFromConsumerGroup ¶ added in v0.2.0
func (this_ *Service) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error)
type TopicInfo ¶
type TopicInfo struct { Topic string `json:"topic"` Partitions []*TopicPartition `json:"partitions"` }
type TopicMetadata ¶ added in v0.2.4
type TopicMetadata struct { // Version defines the protocol version to use for encode and decode Version int16 `json:"version"` // Err contains the topic error, or 0 if there was no error. Err sarama.KError `json:"err"` // Name contains the topic name. Name string `json:"name"` // IsInternal contains a True if the topic is internal. IsInternal bool `json:"isInternal"` // Partitions contains each partition in the topic. Partitions []*PartitionMetadata `json:"partitions"` }
type TopicPartition ¶ added in v0.1.6
Click to show internal directories.
Click to hide internal directories.