Documentation ¶
Index ¶
- Variables
- func ConvertKafkaPacketToMQConsumerMessage(packet *KafkaPacket) mqenv.MQConsumerMessage
- func StopKafka(mqConnName string) error
- type Base
- func (b *Base) ConfigHeartbeatInterval(interval int)
- func (b *Base) ConfigKerberosKeyTab(kerberosKeyTab string)
- func (b *Base) ConfigKerberosPrincipal(kerberosPrincipal string)
- func (b *Base) ConfigKerberosServiceName(name string)
- func (b *Base) ConfigPartition(partition int)
- func (b *Base) ConfigReconnectInterval(interval int)
- func (b *Base) ConfigSaslMechanisms(saslMechanisms string)
- func (b *Base) ConfigSaslPassword(saslPassword string)
- func (b *Base) ConfigSaslUserName(saslUsername string)
- func (b *Base) ConfigSecurityProtocol(securityProtocol string)
- func (b *Base) ConfigServers(servers string)
- func (b *Base) ConfigSessionTimeout(timeout int)
- func (b *Base) SetCompletionCallback(callback func(messages []k.Message, err error))
- type CallBack
- type Config
- type Consumer
- type InstStats
- type KafkaPacket
- func (*KafkaPacket) Descriptor() ([]byte, []int)deprecated
- func (x *KafkaPacket) GetAppId() string
- func (x *KafkaPacket) GetBody() []byte
- func (x *KafkaPacket) GetConsumerTag() string
- func (x *KafkaPacket) GetContentEncoding() string
- func (x *KafkaPacket) GetContentType() string
- func (x *KafkaPacket) GetCorrelationId() string
- func (x *KafkaPacket) GetErrorMessage() string
- func (x *KafkaPacket) GetExchange() string
- func (x *KafkaPacket) GetGroupId() string
- func (x *KafkaPacket) GetHeaders() []*KafkaPacket_Header
- func (x *KafkaPacket) GetMessageId() string
- func (x *KafkaPacket) GetReplyTo() string
- func (x *KafkaPacket) GetRoutingKey() string
- func (x *KafkaPacket) GetSendTo() string
- func (x *KafkaPacket) GetStatusCode() uint32
- func (x *KafkaPacket) GetTimestamp() uint64
- func (x *KafkaPacket) GetType() string
- func (x *KafkaPacket) GetUserId() string
- func (*KafkaPacket) ProtoMessage()
- func (x *KafkaPacket) ProtoReflect() protoreflect.Message
- func (x *KafkaPacket) Reset()
- func (x *KafkaPacket) String() string
- type KafkaPacket_Header
- func (*KafkaPacket_Header) Descriptor() ([]byte, []int)deprecated
- func (x *KafkaPacket_Header) GetName() string
- func (x *KafkaPacket_Header) GetValue() string
- func (*KafkaPacket_Header) ProtoMessage()
- func (x *KafkaPacket_Header) ProtoReflect() protoreflect.Message
- func (x *KafkaPacket_Header) Reset()
- func (x *KafkaPacket_Header) String() string
- type KafkaWorker
- type Producer
- type Stats
- type Worker
Constants ¶
This section is empty.
Variables ¶
var File_kafkapacket_proto protoreflect.FileDescriptor
Functions ¶
func ConvertKafkaPacketToMQConsumerMessage ¶ added in v0.2.5
func ConvertKafkaPacketToMQConsumerMessage(packet *KafkaPacket) mqenv.MQConsumerMessage
ConvertKafkaPacketToMQConsumerMessage 把接收到的kafkaPacket 数据转换成MQConsumerMessage.
Types ¶
type Base ¶ added in v0.2.5
type Base struct { Partition int // partition 分区 Config map[string]interface{} // kafka 的配置字典 CompletionCallback func(messages []k.Message, err error) // 发送状态通知函数 }
Base .
func (*Base) ConfigHeartbeatInterval ¶ added in v0.2.5
ConfigHeartbeatInterval 配置心跳检测间隔.
func (*Base) ConfigKerberosKeyTab ¶ added in v0.2.5
ConfigKerberosKeyTab 使用kerberos 认证需要配置.
func (*Base) ConfigKerberosPrincipal ¶ added in v0.2.5
ConfigKerberosPrincipal 使用kerberos 认证需要配置.
func (*Base) ConfigKerberosServiceName ¶ added in v0.2.5
ConfigKerberosServiceName 使用kerberos 认证需要配置.
func (*Base) ConfigPartition ¶ added in v0.2.5
ConfigPartition 配置分区,如partition 为0.
func (*Base) ConfigReconnectInterval ¶ added in v0.2.5
ConfigReconnectInterval 配置断线重连的时间间隔,单位是毫秒.
func (*Base) ConfigSaslMechanisms ¶ added in v0.2.5
ConfigSaslMechanisms 使用plain 认证需要配置,可以使用PLAIN.
func (*Base) ConfigSaslPassword ¶ added in v0.2.5
ConfigSaslPassword 使用plain 认证需要配置.
func (*Base) ConfigSaslUserName ¶ added in v0.2.5
ConfigSaslUserName 使用plain 认证需要配置.
func (*Base) ConfigSecurityProtocol ¶ added in v0.2.5
ConfigSecurityProtocol 使用plain 和kerberos 认证需要配置,如sasl_plaintext.
func (*Base) ConfigServers ¶ added in v0.2.5
ConfigServers 配置连接的服务器,如"localhost:9092,localhost:9093".
func (*Base) ConfigSessionTimeout ¶ added in v0.2.5
ConfigSessionTimeout 配置会话超时.
type Config ¶
type Config struct { Hosts string Partition int PrivateTopic string GroupID string MaxPollIntervalMS int // 消息类型: //direct:组播,订阅同一个topic,消费者组会相同,一条消息只会被组内一个消费者接收 //fanout:广播,订阅同一个topic,但是消费者组会使用uuid,所有组都会收到信息 MessageType string `yaml:"messageType" json:"messageType"` // kerberos 认证需要配置 KerberosServiceName string KerberosKeytab string KerberosPrincipal string // plain 认证需要配置 SaslMechanisms string SaslUsername string SaslPassword string UseOriginalContent bool `yaml:"useOriginalContent" json:"useOriginalContent"` }
Config kafkav2 配置参数.
type Consumer ¶ added in v0.2.5
type Consumer struct { Base Readers map[string]*k.Reader // 每一个topic 一个reader Brokers []string // kafka 的节点 OffsetDict map[string]int64 // 记录偏移量,避免在连接断开重连时候重复处理信息 // contains filtered or unexported fields }
Consumer 消费者.
func NewConsumer ¶ added in v0.2.5
NewConsumer 实例化返回消费者.
func (*Consumer) ConfigGroupID ¶ added in v0.2.5
ConfigGroupID 配置group id.
func (*Consumer) ConfigMaxPollIntervalMS ¶ added in v0.2.5
ConfigMaxPollIntervalMS 配置两次拉取数据之间的最大间隔.
func (*Consumer) Receive ¶ added in v0.2.5
Receive 订阅topic,处理消息. @title Receive @param topic 订阅的topic @param callback ,处理接收到的信息,入参是 接收到的[]byte
func (*Consumer) StopConsumer ¶ added in v0.2.5
func (c *Consumer) StopConsumer()
StopConsumer 停止消费.
type InstStats ¶
type InstStats struct { Bytes int64 `json:"bytes"` Dials int64 `json:"connections"` Topic string `json:"topic"` Messages int64 `json:"messages"` Rebalances int64 `json:"rebalances"` Errors int64 `json:"errors"` Timeouts int64 `json:"timeouts"` ClientID string `json:"clientID"` QueueLength int64 `json:"queueLength"` QueueCapacity int64 `json:"queueCapacity"` }
InstStats .
type KafkaPacket ¶ added in v0.2.5
type KafkaPacket struct { // Properties ContentType string `protobuf:"bytes,1,opt,name=contentType,proto3" json:"contentType,omitempty"` // MIME content type ContentEncoding string `protobuf:"bytes,2,opt,name=contentEncoding,proto3" json:"contentEncoding,omitempty"` // MIME content encoding SendTo string `protobuf:"bytes,3,opt,name=sendTo,proto3" json:"sendTo,omitempty"` // application use - address to send to (ex: RPC) GroupId string `protobuf:"bytes,4,opt,name=groupId,proto3" json:"groupId,omitempty"` // application use - kafka group id CorrelationId string `protobuf:"bytes,5,opt,name=correlationId,proto3" json:"correlationId,omitempty"` // application use - correlation identifier ReplyTo string `protobuf:"bytes,6,opt,name=replyTo,proto3" json:"replyTo,omitempty"` // application use - address to reply to (ex: RPC) MessageId string `protobuf:"bytes,7,opt,name=messageId,proto3" json:"messageId,omitempty"` // application use - message identifier Timestamp uint64 `protobuf:"varint,8,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // application use - message timestamp Type string `protobuf:"bytes,9,opt,name=type,proto3" json:"type,omitempty"` // application use - message type name UserId string `protobuf:"bytes,10,opt,name=userId,proto3" json:"userId,omitempty"` // application use - creating user - should be authenticated user AppId string `protobuf:"bytes,11,opt,name=appId,proto3" json:"appId,omitempty"` // application use - creating application id StatusCode uint32 `protobuf:"varint,12,opt,name=statusCode,proto3" json:"statusCode,omitempty"` // application response use - message response status ErrorMessage string `protobuf:"bytes,13,opt,name=errorMessage,proto3" json:"errorMessage,omitempty"` // application response use - error message Headers []*KafkaPacket_Header `protobuf:"bytes,14,rep,name=headers,proto3" json:"headers,omitempty"` // Application or header exchange table Body []byte `protobuf:"bytes,15,opt,name=body,proto3" json:"body,omitempty"` RoutingKey string `protobuf:"bytes,16,opt,name=routingKey,proto3" json:"routingKey,omitempty"` // application use - delivery request ConsumerTag string `protobuf:"bytes,17,opt,name=consumerTag,proto3" json:"consumerTag,omitempty"` Exchange string `protobuf:"bytes,18,opt,name=exchange,proto3" json:"exchange,omitempty"` // contains filtered or unexported fields }
func (*KafkaPacket) Descriptor
deprecated
added in
v0.2.5
func (*KafkaPacket) Descriptor() ([]byte, []int)
Deprecated: Use KafkaPacket.ProtoReflect.Descriptor instead.
func (*KafkaPacket) GetAppId ¶ added in v0.2.5
func (x *KafkaPacket) GetAppId() string
func (*KafkaPacket) GetBody ¶ added in v0.2.5
func (x *KafkaPacket) GetBody() []byte
func (*KafkaPacket) GetConsumerTag ¶ added in v0.2.6
func (x *KafkaPacket) GetConsumerTag() string
func (*KafkaPacket) GetContentEncoding ¶ added in v0.2.5
func (x *KafkaPacket) GetContentEncoding() string
func (*KafkaPacket) GetContentType ¶ added in v0.2.5
func (x *KafkaPacket) GetContentType() string
func (*KafkaPacket) GetCorrelationId ¶ added in v0.2.5
func (x *KafkaPacket) GetCorrelationId() string
func (*KafkaPacket) GetErrorMessage ¶ added in v0.2.5
func (x *KafkaPacket) GetErrorMessage() string
func (*KafkaPacket) GetExchange ¶ added in v0.2.6
func (x *KafkaPacket) GetExchange() string
func (*KafkaPacket) GetGroupId ¶ added in v0.2.5
func (x *KafkaPacket) GetGroupId() string
func (*KafkaPacket) GetHeaders ¶ added in v0.2.5
func (x *KafkaPacket) GetHeaders() []*KafkaPacket_Header
func (*KafkaPacket) GetMessageId ¶ added in v0.2.5
func (x *KafkaPacket) GetMessageId() string
func (*KafkaPacket) GetReplyTo ¶ added in v0.2.5
func (x *KafkaPacket) GetReplyTo() string
func (*KafkaPacket) GetRoutingKey ¶ added in v0.2.6
func (x *KafkaPacket) GetRoutingKey() string
func (*KafkaPacket) GetSendTo ¶ added in v0.2.5
func (x *KafkaPacket) GetSendTo() string
func (*KafkaPacket) GetStatusCode ¶ added in v0.2.5
func (x *KafkaPacket) GetStatusCode() uint32
func (*KafkaPacket) GetTimestamp ¶ added in v0.2.5
func (x *KafkaPacket) GetTimestamp() uint64
func (*KafkaPacket) GetType ¶ added in v0.2.5
func (x *KafkaPacket) GetType() string
func (*KafkaPacket) GetUserId ¶ added in v0.2.5
func (x *KafkaPacket) GetUserId() string
func (*KafkaPacket) ProtoMessage ¶ added in v0.2.5
func (*KafkaPacket) ProtoMessage()
func (*KafkaPacket) ProtoReflect ¶ added in v0.2.5
func (x *KafkaPacket) ProtoReflect() protoreflect.Message
func (*KafkaPacket) Reset ¶ added in v0.2.5
func (x *KafkaPacket) Reset()
func (*KafkaPacket) String ¶ added in v0.2.5
func (x *KafkaPacket) String() string
type KafkaPacket_Header ¶ added in v0.2.5
type KafkaPacket_Header struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // contains filtered or unexported fields }
func (*KafkaPacket_Header) Descriptor
deprecated
added in
v0.2.5
func (*KafkaPacket_Header) Descriptor() ([]byte, []int)
Deprecated: Use KafkaPacket_Header.ProtoReflect.Descriptor instead.
func (*KafkaPacket_Header) GetName ¶ added in v0.2.5
func (x *KafkaPacket_Header) GetName() string
func (*KafkaPacket_Header) GetValue ¶ added in v0.2.5
func (x *KafkaPacket_Header) GetValue() string
func (*KafkaPacket_Header) ProtoMessage ¶ added in v0.2.5
func (*KafkaPacket_Header) ProtoMessage()
func (*KafkaPacket_Header) ProtoReflect ¶ added in v0.2.5
func (x *KafkaPacket_Header) ProtoReflect() protoreflect.Message
func (*KafkaPacket_Header) Reset ¶ added in v0.2.5
func (x *KafkaPacket_Header) Reset()
func (*KafkaPacket_Header) String ¶ added in v0.2.5
func (x *KafkaPacket_Header) String() string
type KafkaWorker ¶ added in v0.2.5
type KafkaWorker struct { Producer *Producer // 生产者 Consumer *Consumer // 消费者 PrivateTopic string // 私有topic,用于发出信息后收到回复 ContentType string //序列化类型,如json ContentEncoding string // 编码格式 GroupID string //组id,会包含在 kafkapacket 数据包中 MsgType string // 消息类型 Stats Stats // 统计信息 UseOriginalContent bool // 是否使用原始的方式序列化(使用json 序列化,而不是protobuf) // contains filtered or unexported fields }
KafkaWorker 把生产者、消费者结合起来,实现请求响应模式.
func InitKafka ¶
func InitKafka(mqConnName string, config Config) (*KafkaWorker, error)
InitKafka 初始化kafka.
func NewKafkaWorker ¶ added in v0.2.5
func NewKafkaWorker(hosts string, partition int, privateTopic, groupID string) *KafkaWorker
NewKafkaWorker 实例化一个kafka worker.
func (*KafkaWorker) Send ¶ added in v0.2.5
func (worker *KafkaWorker) Send(topic string, publishMsg *mqenv.MQPublishMessage, needReply bool) (*mqenv.MQConsumerMessage, error)
Send 发送信息.
func (*KafkaWorker) Subscribe ¶ added in v0.2.5
func (worker *KafkaWorker) Subscribe(topic string, consumeProxy *mqenv.MQConsumerProxy) error
Subscribe 订阅topic.
type Producer ¶ added in v0.2.5
Producer 生产者.
func NewProducer ¶ added in v0.2.5
NewProducer 返回一个生产者.