Documentation ¶
Index ¶
- Variables
- type ChainArgs
- type Config
- type Consumer
- type ConsumerConfig
- type ConsumerTopic
- type DefaultEventHandler
- func (e DefaultEventHandler) OnCMConsumed(_ *HistoryRecord)
- func (e DefaultEventHandler) OnConsumerClosed(_ string)
- func (e DefaultEventHandler) OnConsumerHeartbeatTimeout(event TimeoutEvent)
- func (e DefaultEventHandler) OnConsumerRegister(_ string)
- func (e DefaultEventHandler) OnConsumerRegisterTimeout(event TimeoutEvent)
- func (e DefaultEventHandler) OnFrameParseError(_ *proto.TransferFrame, _ transfer.Conn)
- func (e DefaultEventHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) error
- func (e DefaultEventHandler) OnProducerClosed(_ string)
- func (e DefaultEventHandler) OnProducerHeartbeatTimeout(event TimeoutEvent)
- func (e DefaultEventHandler) OnProducerRegister(_ string)
- func (e DefaultEventHandler) OnProducerRegisterTimeout(event TimeoutEvent)
- type EPool
- type Engine
- func (e *Engine) AddTopic(name []byte) *Topic
- func (e *Engine) BindMessageHandler(m proto.Message, ack proto.Message, handler HookHandler, text string) error
- func (e *Engine) Crypto() proto.Crypto
- func (e *Engine) Ctx() context.Context
- func (e *Engine) EventHandler() EventHandler
- func (e *Engine) GetTopic(name []byte) *Topic
- func (e *Engine) GetTopicOffset(name []byte) uint64
- func (e *Engine) HeartbeatInterval() float64
- func (e *Engine) IsTokenCorrect(token string) bool
- func (e *Engine) Logger() logger.Iface
- func (e *Engine) NeedToken() bool
- func (e *Engine) ProducerSendInterval() time.Duration
- func (e *Engine) Publisher(msg *proto.PMessage) uint64
- func (e *Engine) QueryConsumer(addr string) (*Consumer, bool)
- func (e *Engine) QueryProducer(addr string) (*Producer, bool)
- func (e *Engine) RangeConsumer(fn func(c *Consumer) bool)
- func (e *Engine) RangeProducer(fn func(p *Producer) bool)
- func (e *Engine) RangeTopic(fn func(topic *Topic) bool)
- func (e *Engine) RemoveConsumer(addr string)
- func (e *Engine) RemoveProducer(addr string)
- func (e *Engine) ReplaceTransfer(transfer transfer.Transfer) *Engine
- func (e *Engine) Serve() error
- func (e *Engine) SetCrypto(crypto proto.Crypto) *Engine
- func (e *Engine) SetCryptoPlan(option string, key ...string) *Engine
- func (e *Engine) SetEventHandler(handler EventHandler) *Engine
- func (e *Engine) SetTopicHistoryBufferSize(size int) *Engine
- func (e *Engine) Stat() *Statistic
- func (e *Engine) Stop()
- func (e *Engine) TokenCrypto() *proto.TokenCrypto
- type EventHandler
- type FlowHandler
- type HistoryRecord
- type HistoryRecordStatus
- type Hook
- type HookHandler
- type Monitor
- func (k *Monitor) Do(ctx context.Context) error
- func (k *Monitor) Interval() time.Duration
- func (k *Monitor) OnClientClosed(addr string)
- func (k *Monitor) OnClientConnected(addr string)
- func (k *Monitor) OnClientHeartbeat(addr string)
- func (k *Monitor) OnClientRegistered(addr string, linkType proto.LinkType)
- func (k *Monitor) OnStartup()
- func (k *Monitor) ReadClientTimeInfo(addr string, linkType proto.LinkType) *TimeInfo
- func (k *Monitor) String() string
- type Producer
- type ProducerConfig
- type Statistic
- type TimeInfo
- type TimeoutEvent
- type TimeoutEventType
- type Topic
- func (t *Topic) AddConsumer(con *Consumer)
- func (t *Topic) LatestMessage() *HistoryRecord
- func (t *Topic) Publisher(pm *proto.PMessage) uint64
- func (t *Topic) RangeConsumer(fn func(c *Consumer))
- func (t *Topic) RemoveConsumer(addr string)
- func (t *Topic) SetCrypto(crypto proto.Crypto) *Topic
- func (t *Topic) SetOnConsumed(onConsumed func(record *HistoryRecord)) *Topic
- type TopicConsumer
- type TopicOffset
Constants ¶
This section is empty.
Variables ¶
var ( ErrConsumerNotRegister = errors.New("consumer not register") ErrProducerNotRegister = errors.New("producer not register") ErrPMNotFound = errors.New("producer-message not found in frame") // ErrNoNeedToReply 不再回复响应给客户端 ErrNoNeedToReply = errors.New("no need to reply to the client") )
Functions ¶
This section is empty.
Types ¶
type ChainArgs ¶ added in v0.3.2
type ChainArgs struct {
// contains filtered or unexported fields
}
func (*ChainArgs) ReplyClient ¶ added in v0.3.3
ReplyClient 是否需要回复客户端, 只要未显示设置不回复,均需要回复响应给客户端
type Config ¶
type Config struct { Host string `json:"host"` Port string `json:"port"` MaxOpenConn int `json:"max_open_conn"` // 允许的最大连接数, 即 生产者+消费者最多有 MaxOpenConn 个 BufferSize int `json:"buffer_size"` // 生产者消息历史记录最大数量 HeartbeatTimeout float64 `json:"heartbeat_timeout"` Logger logger.Iface `json:"-"` Token string `json:"-"` // 注册认证密钥 EventHandler EventHandler `json:"-"` // 事件触发器 Ctx context.Context `json:"-"` // contains filtered or unexported fields }
type Consumer ¶
type Consumer struct { Addr string `json:"addr"` Conf *ConsumerConfig `json:"conf"` Conn transfer.Conn `json:"-"` // contains filtered or unexported fields }
type ConsumerConfig ¶
type ConsumerTopic ¶ added in v0.3.7
type ConsumerTopic struct { Addr string `json:"addr" description:"连接地址"` Topics []string `json:"topics" description:"订阅的主题名列表"` }
ConsumerTopic 消费者订阅的topic
type DefaultEventHandler ¶ added in v0.3.3
type DefaultEventHandler struct{}
func (DefaultEventHandler) OnCMConsumed ¶ added in v0.3.3
func (e DefaultEventHandler) OnCMConsumed(_ *HistoryRecord)
func (DefaultEventHandler) OnConsumerClosed ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerClosed(_ string)
func (DefaultEventHandler) OnConsumerHeartbeatTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerHeartbeatTimeout(event TimeoutEvent)
func (DefaultEventHandler) OnConsumerRegister ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerRegister(_ string)
func (DefaultEventHandler) OnConsumerRegisterTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerRegisterTimeout(event TimeoutEvent)
func (DefaultEventHandler) OnFrameParseError ¶ added in v0.3.3
func (e DefaultEventHandler) OnFrameParseError(_ *proto.TransferFrame, _ transfer.Conn)
func (DefaultEventHandler) OnNotImplementMessageType ¶ added in v0.3.3
func (e DefaultEventHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) error
func (DefaultEventHandler) OnProducerClosed ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerClosed(_ string)
func (DefaultEventHandler) OnProducerHeartbeatTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerHeartbeatTimeout(event TimeoutEvent)
func (DefaultEventHandler) OnProducerRegister ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerRegister(_ string)
func (DefaultEventHandler) OnProducerRegisterTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerRegisterTimeout(event TimeoutEvent)
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func (*Engine) BindMessageHandler ¶
func (e *Engine) BindMessageHandler(m proto.Message, ack proto.Message, handler HookHandler, text string) error
BindMessageHandler 绑定一个自实现的消息处理器,
参数handler为收到此消息后的同步处理函数, 如果需要在处理完成之后向客户端返回消息,则直接就地修改frame对象, HookHandler 的第一个参数为接收到的消息帧, 可通过 proto.TransferFrame.Unmarshal 方法解码, 第二个参数为当前的客户端连接, 此方法返回"处理是否正确"一个参数, 若定义了 needAck 则需要返回错误消息给客户端 @param m proto.Message 实现了 proto.Message 接口的自定义消息, 不允许与内置消息冲突, 可通过 proto.IsMessageDefined 判断 @param handler HookHandler 消息处理方法 @param ack proto.Message 是否需要返回响应给客户端 @param text string 此消息的摘要名称
func (*Engine) EventHandler ¶
func (e *Engine) EventHandler() EventHandler
func (*Engine) GetTopicOffset ¶
GetTopicOffset 查询指定topic当前的消息偏移量
func (*Engine) HeartbeatInterval ¶ added in v0.3.3
HeartbeatInterval 心跳周期间隔
func (*Engine) IsTokenCorrect ¶ added in v0.3.2
IsTokenCorrect 判断客户端的token是否正确,若未开启token验证,则始终正确
func (*Engine) ProducerSendInterval ¶
ProducerSendInterval 允许生产者发送数据间隔
func (*Engine) QueryConsumer ¶
QueryConsumer 查询消费者记录, 若未注册则返回nil
func (*Engine) QueryProducer ¶
QueryProducer 查询生产者记录, 若未注册则返回nil
func (*Engine) RangeConsumer ¶
RangeConsumer 遍历连接的消费者, if false returned, for-loop will stop
func (*Engine) RangeProducer ¶
RangeProducer 遍历连接的生产者, if false returned, for-loop will stop
func (*Engine) RangeTopic ¶
RangeTopic if false returned, for-loop will stop
func (*Engine) ReplaceTransfer ¶
ReplaceTransfer 替换传输层实现
func (*Engine) SetCryptoPlan ¶ added in v0.3.4
SetCryptoPlan 设置加密方案
@param option string 加密方案, 支持token/no (令牌加密和不加密) @param key []string 其他加密参数
func (*Engine) SetEventHandler ¶ added in v0.3.2
func (e *Engine) SetEventHandler(handler EventHandler) *Engine
SetEventHandler 设置事件触发器
func (*Engine) SetTopicHistoryBufferSize ¶
SetTopicHistoryBufferSize 设置topic历史数据缓存大小, 对于修改前已经创建的topic不受影响
@param size int 历史数据缓存大小,[1, 10000)
func (*Engine) TokenCrypto ¶ added in v0.3.3
func (e *Engine) TokenCrypto() *proto.TokenCrypto
TokenCrypto Token加解密器,亦可作为全局加解密器
type EventHandler ¶
type EventHandler interface { // OnFrameParseError 当来自客户端消息帧解析出错时触发的事件(同步调用) OnFrameParseError(frame *proto.TransferFrame, con transfer.Conn) // OnConsumerRegister 当消费者注册成功时触发的事件(异步调用) OnConsumerRegister(addr string) // OnProducerRegister 当生产者注册成功时触发的事件(异步调用) OnProducerRegister(addr string) // OnConsumerClosed 当消费者关闭连接时触发的事件(异步调用) OnConsumerClosed(addr string) // OnProducerClosed 当生产者关闭连接时触发的事件(异步调用) OnProducerClosed(addr string) // OnConsumerHeartbeatTimeout 当消费者心跳超时触发的事件(异步调用) OnConsumerHeartbeatTimeout(event TimeoutEvent) // OnProducerHeartbeatTimeout 当生产者心跳超时时触发的事件(异步调用) OnProducerHeartbeatTimeout(event TimeoutEvent) // OnConsumerRegisterTimeout 当消费者连接成功后不注册引发的超时事件(异步调用) OnConsumerRegisterTimeout(event TimeoutEvent) // OnProducerRegisterTimeout 当消生产者连接成功后不注册引发的超时事件(异步调用) OnProducerRegisterTimeout(event TimeoutEvent) // OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件(同步调用) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) error // OnCMConsumed 当一个消费者被消费成功(成功发送给全部消费者)后时触发的事件(同步调用) OnCMConsumed(record *HistoryRecord) }
EventHandler 事件触发器
type FlowHandler ¶ added in v0.3.2
type HistoryRecord ¶
type HistoryRecordStatus ¶ added in v0.3.7
type HistoryRecordStatus string
const (
BuildFailed HistoryRecordStatus = ""
)
type Hook ¶
type Hook struct { Type proto.MessageType // 据此判断消息是否已实现 ACKDefined bool // 消息定义处,定义需要有返回值 Handler HookHandler }
type HookHandler ¶
type HookHandler func(frame *proto.TransferFrame, con transfer.Conn) error
HookHandler 消息处理方法
type Monitor ¶ added in v0.3.3
Monitor 监视器 1. 检测连接成功但不注册的客户端 2. 检测心跳超时的客户端 超时时主动断开连接
func (*Monitor) OnClientClosed ¶ added in v0.3.3
OnClientClosed 连接关闭,清空时间信息
func (*Monitor) OnClientConnected ¶ added in v0.3.3
func (*Monitor) OnClientHeartbeat ¶ added in v0.3.3
func (*Monitor) OnClientRegistered ¶ added in v0.3.3
func (*Monitor) ReadClientTimeInfo ¶ added in v0.3.3
type Producer ¶
type Producer struct { Addr string `json:"addr"` Conf *ProducerConfig `json:"conf"` Conn transfer.Conn `json:"-"` // contains filtered or unexported fields }
func (*Producer) NeedConfirm ¶
type ProducerConfig ¶
type Statistic ¶ added in v0.3.7
type Statistic struct {
// contains filtered or unexported fields
}
func (Statistic) ConsumerTopics ¶ added in v0.3.7
func (k Statistic) ConsumerTopics() []*ConsumerTopic
ConsumerTopics 获取消费者订阅的主题名
func (Statistic) LatestRecord ¶ added in v0.3.7
func (k Statistic) LatestRecord() []*HistoryRecord
LatestRecord 获取topic的最新消息记录
func (Statistic) TopicConsumers ¶ added in v0.3.7
func (k Statistic) TopicConsumers() []*TopicConsumer
TopicConsumers 获取topic内的消费者连接
func (Statistic) TopicsName ¶ added in v0.3.7
TopicsName 获取当前全部Topic名称
func (Statistic) TopicsOffset ¶ added in v0.3.7
func (k Statistic) TopicsOffset() []*TopicOffset
TopicsOffset 获取全部的Topic以及响应的偏移量
type TimeInfo ¶ added in v0.3.3
type TimeInfo struct { Addr string `json:"addr"` LinkType proto.LinkType `json:"link_type"` ConnectedAt int64 `json:"connected_at"` // 连接成功时间戳 RegisteredAt int64 `json:"registered_at"` // 注册成功时间戳 HeartbeatAt int64 `json:"heartbeat_at"` // 最近的一个心跳时间戳 }
TimeInfo 关于监视器有关的时间信息
func (*TimeInfo) IsRegistered ¶ added in v0.3.3
type TimeoutEvent ¶ added in v0.3.3
type TimeoutEvent struct { Addr string `json:"addr,omitempty"` EventType TimeoutEventType `json:"event_type,omitempty"` LinkType proto.LinkType `json:"link_type,omitempty"` TimeoutInterval float64 `json:"timeout_interval,omitempty"` ConnectedAt int64 `json:"connected_at"` TimeoutAt int64 `json:"timeout_at"` }
TimeoutEvent 超时事件
type TimeoutEventType ¶ added in v0.3.3
type TimeoutEventType string
const ( HeartbeatTimeoutEvent TimeoutEventType = "HEARTBEAT_TIMEOUT" RegisterTimeoutEvent TimeoutEventType = "REGISTER_TIMEOUT" )
type Topic ¶
type Topic struct { Name []byte `json:"name"` // 唯一标识 HistorySize int `json:"history_size"` // 生产者消息缓冲区大小 Offset uint64 `json:"offset"` // 当前数据偏移量,仅用于模糊显示 // contains filtered or unexported fields }
func (*Topic) LatestMessage ¶ added in v0.3.7
func (t *Topic) LatestMessage() *HistoryRecord
LatestMessage 最新的消息记录
func (*Topic) RangeConsumer ¶ added in v0.3.7
RangeConsumer 逐个迭代内部消费者
func (*Topic) SetOnConsumed ¶ added in v0.3.4
func (t *Topic) SetOnConsumed(onConsumed func(record *HistoryRecord)) *Topic
type TopicConsumer ¶ added in v0.3.7
type TopicConsumer struct { Name string `json:"name" description:"名称"` Consumers []string `json:"consumers" description:"消费者连接"` }
TopicConsumer topic内的消费者