engine

package
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumerNotRegister = errors.New("consumer not register")
	ErrProducerNotRegister = errors.New("producer not register")
	ErrPMNotFound          = errors.New("producer-message not found in frame")
	// ErrNoNeedToReply 不再回复响应给客户端
	ErrNoNeedToReply = errors.New("no need to reply to the client")
)

Functions

This section is empty.

Types

type ChainArgs added in v0.3.2

type ChainArgs struct {
	// contains filtered or unexported fields
}

func (*ChainArgs) ReplyClient added in v0.3.3

func (args *ChainArgs) ReplyClient() bool

ReplyClient 是否需要回复客户端, 只要未显示设置不回复,均需要回复响应给客户端

func (*ChainArgs) Reset added in v0.3.3

func (args *ChainArgs) Reset()

func (*ChainArgs) SetError added in v0.3.4

func (args *ChainArgs) SetError(err error) *ChainArgs

SetError 记录错误

func (*ChainArgs) StopError added in v0.3.4

func (args *ChainArgs) StopError() error

StopError 判断是否处理错误

type Config

type Config struct {
	Host             string          `json:"host"`
	Port             string          `json:"port"`
	MaxOpenConn      int             `json:"max_open_conn"` // 允许的最大连接数, 即 生产者+消费者最多有 MaxOpenConn 个
	BufferSize       int             `json:"buffer_size"`   // 生产者消息历史记录最大数量
	HeartbeatTimeout float64         `json:"heartbeat_timeout"`
	Logger           logger.Iface    `json:"-"`
	Token            string          `json:"-"` // 注册认证密钥
	EventHandler     EventHandler    `json:"-"` // 事件触发器
	Ctx              context.Context `json:"-"`
	// contains filtered or unexported fields
}

type Consumer

type Consumer struct {
	Addr string          `json:"addr"`
	Conf *ConsumerConfig `json:"conf"`
	Conn transfer.Conn   `json:"-"`
	// contains filtered or unexported fields
}

func (*Consumer) Index added in v0.3.3

func (c *Consumer) Index() int

func (*Consumer) IsFree added in v0.3.3

func (c *Consumer) IsFree() bool

func (*Consumer) NeedConfirm

func (c *Consumer) NeedConfirm() bool

NeedConfirm 是否需要返回确认消息给客户端

func (*Consumer) Send

func (c *Consumer) Send(msg proto.Message) error

Send 向消费者客户端推送消息, 此操作是线程安全的

func (*Consumer) SetConn added in v0.3.3

func (c *Consumer) SetConn(r transfer.Conn) *Consumer

type ConsumerConfig

type ConsumerConfig struct {
	Topics []string      `json:"topics"`
	Ack    proto.AckType `json:"ack"`
}

type ConsumerTopic added in v0.3.7

type ConsumerTopic struct {
	Addr   string   `json:"addr" description:"连接地址"`
	Topics []string `json:"topics" description:"订阅的主题名列表"`
}

ConsumerTopic 消费者订阅的topic

type DefaultEventHandler added in v0.3.3

type DefaultEventHandler struct{}

func (DefaultEventHandler) OnCMConsumed added in v0.3.3

func (e DefaultEventHandler) OnCMConsumed(_ *HistoryRecord)

func (DefaultEventHandler) OnConsumerClosed added in v0.3.3

func (e DefaultEventHandler) OnConsumerClosed(_ string)

func (DefaultEventHandler) OnConsumerHeartbeatTimeout added in v0.3.3

func (e DefaultEventHandler) OnConsumerHeartbeatTimeout(event TimeoutEvent)

func (DefaultEventHandler) OnConsumerRegister added in v0.3.3

func (e DefaultEventHandler) OnConsumerRegister(_ string)

func (DefaultEventHandler) OnConsumerRegisterTimeout added in v0.3.3

func (e DefaultEventHandler) OnConsumerRegisterTimeout(event TimeoutEvent)

func (DefaultEventHandler) OnFrameParseError added in v0.3.3

func (e DefaultEventHandler) OnFrameParseError(_ *proto.TransferFrame, _ transfer.Conn)

func (DefaultEventHandler) OnNotImplementMessageType added in v0.3.3

func (e DefaultEventHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) error

func (DefaultEventHandler) OnProducerClosed added in v0.3.3

func (e DefaultEventHandler) OnProducerClosed(_ string)

func (DefaultEventHandler) OnProducerHeartbeatTimeout added in v0.3.3

func (e DefaultEventHandler) OnProducerHeartbeatTimeout(event TimeoutEvent)

func (DefaultEventHandler) OnProducerRegister added in v0.3.3

func (e DefaultEventHandler) OnProducerRegister(_ string)

func (DefaultEventHandler) OnProducerRegisterTimeout added in v0.3.3

func (e DefaultEventHandler) OnProducerRegisterTimeout(event TimeoutEvent)

type EPool added in v0.3.3

type EPool struct {
	// contains filtered or unexported fields
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func New

func New(cs ...Config) *Engine

New 创建一个新的服务器

func (*Engine) AddTopic

func (e *Engine) AddTopic(name []byte) *Topic

AddTopic 添加一个新的topic

func (*Engine) BindMessageHandler

func (e *Engine) BindMessageHandler(m proto.Message, ack proto.Message, handler HookHandler, text string) error

BindMessageHandler 绑定一个自实现的消息处理器,

参数handler为收到此消息后的同步处理函数, 如果需要在处理完成之后向客户端返回消息,则直接就地修改frame对象,
	HookHandler 的第一个参数为接收到的消息帧, 可通过 proto.TransferFrame.Unmarshal 方法解码, 第二个参数为当前的客户端连接,
	此方法返回"处理是否正确"一个参数, 若定义了 needAck 则需要返回错误消息给客户端

@param	m		proto.Message	实现了 proto.Message 接口的自定义消息, 不允许与内置消息冲突, 可通过 proto.IsMessageDefined 判断
@param	handler	HookHandler		消息处理方法
@param	ack 	proto.Message	是否需要返回响应给客户端
@param	text 	string 			此消息的摘要名称

func (*Engine) Crypto added in v0.3.3

func (e *Engine) Crypto() proto.Crypto

Crypto 全局加解密器

func (*Engine) Ctx added in v0.3.3

func (e *Engine) Ctx() context.Context

func (*Engine) EventHandler

func (e *Engine) EventHandler() EventHandler

func (*Engine) GetTopic

func (e *Engine) GetTopic(name []byte) *Topic

GetTopic 获取topic,并在不存在时自动新建一个topic

func (*Engine) GetTopicOffset

func (e *Engine) GetTopicOffset(name []byte) uint64

GetTopicOffset 查询指定topic当前的消息偏移量

func (*Engine) HeartbeatInterval added in v0.3.3

func (e *Engine) HeartbeatInterval() float64

HeartbeatInterval 心跳周期间隔

func (*Engine) IsTokenCorrect added in v0.3.2

func (e *Engine) IsTokenCorrect(token string) bool

IsTokenCorrect 判断客户端的token是否正确,若未开启token验证,则始终正确

func (*Engine) Logger

func (e *Engine) Logger() logger.Iface

func (*Engine) NeedToken added in v0.3.1

func (e *Engine) NeedToken() bool

NeedToken 是否需要密钥认证

func (*Engine) ProducerSendInterval

func (e *Engine) ProducerSendInterval() time.Duration

ProducerSendInterval 允许生产者发送数据间隔

func (*Engine) Publisher

func (e *Engine) Publisher(msg *proto.PMessage) uint64

Publisher 发布消息,并返回此消息在当前topic中的偏移量

func (*Engine) QueryConsumer

func (e *Engine) QueryConsumer(addr string) (*Consumer, bool)

QueryConsumer 查询消费者记录, 若未注册则返回nil

func (*Engine) QueryProducer

func (e *Engine) QueryProducer(addr string) (*Producer, bool)

QueryProducer 查询生产者记录, 若未注册则返回nil

func (*Engine) RangeConsumer

func (e *Engine) RangeConsumer(fn func(c *Consumer) bool)

RangeConsumer 遍历连接的消费者, if false returned, for-loop will stop

func (*Engine) RangeProducer

func (e *Engine) RangeProducer(fn func(p *Producer) bool)

RangeProducer 遍历连接的生产者, if false returned, for-loop will stop

func (*Engine) RangeTopic

func (e *Engine) RangeTopic(fn func(topic *Topic) bool)

RangeTopic if false returned, for-loop will stop

func (*Engine) RemoveConsumer

func (e *Engine) RemoveConsumer(addr string)

RemoveConsumer 删除一个消费者

func (*Engine) RemoveProducer

func (e *Engine) RemoveProducer(addr string)

RemoveProducer 删除一个生产者

func (*Engine) ReplaceTransfer

func (e *Engine) ReplaceTransfer(transfer transfer.Transfer) *Engine

ReplaceTransfer 替换传输层实现

func (*Engine) Serve added in v0.3.1

func (e *Engine) Serve() error

Serve 阻塞运行

func (*Engine) SetCrypto added in v0.3.3

func (e *Engine) SetCrypto(crypto proto.Crypto) *Engine

SetCrypto 修改全局加解密器, 必须在 Serve 之前设置

func (*Engine) SetCryptoPlan added in v0.3.4

func (e *Engine) SetCryptoPlan(option string, key ...string) *Engine

SetCryptoPlan 设置加密方案

@param	option	string		加密方案, 支持token/no (令牌加密和不加密)
@param	key 	[]string	其他加密参数

func (*Engine) SetEventHandler added in v0.3.2

func (e *Engine) SetEventHandler(handler EventHandler) *Engine

SetEventHandler 设置事件触发器

func (*Engine) SetTopicHistoryBufferSize

func (e *Engine) SetTopicHistoryBufferSize(size int) *Engine

SetTopicHistoryBufferSize 设置topic历史数据缓存大小, 对于修改前已经创建的topic不受影响

@param size	int 历史数据缓存大小,[1, 10000)

func (*Engine) Stat added in v0.3.7

func (e *Engine) Stat() *Statistic

func (*Engine) Stop added in v0.3.1

func (e *Engine) Stop()

func (*Engine) TokenCrypto added in v0.3.3

func (e *Engine) TokenCrypto() *proto.TokenCrypto

TokenCrypto Token加解密器,亦可作为全局加解密器

type EventHandler

type EventHandler interface {
	// OnFrameParseError 当来自客户端消息帧解析出错时触发的事件(同步调用)
	OnFrameParseError(frame *proto.TransferFrame, con transfer.Conn)
	// OnConsumerRegister 当消费者注册成功时触发的事件(异步调用)
	OnConsumerRegister(addr string)
	// OnProducerRegister 当生产者注册成功时触发的事件(异步调用)
	OnProducerRegister(addr string)
	// OnConsumerClosed 当消费者关闭连接时触发的事件(异步调用)
	OnConsumerClosed(addr string)
	// OnProducerClosed 当生产者关闭连接时触发的事件(异步调用)
	OnProducerClosed(addr string)
	// OnConsumerHeartbeatTimeout 当消费者心跳超时触发的事件(异步调用)
	OnConsumerHeartbeatTimeout(event TimeoutEvent)
	// OnProducerHeartbeatTimeout 当生产者心跳超时时触发的事件(异步调用)
	OnProducerHeartbeatTimeout(event TimeoutEvent)
	// OnConsumerRegisterTimeout 当消费者连接成功后不注册引发的超时事件(异步调用)
	OnConsumerRegisterTimeout(event TimeoutEvent)
	// OnProducerRegisterTimeout 当消生产者连接成功后不注册引发的超时事件(异步调用)
	OnProducerRegisterTimeout(event TimeoutEvent)
	// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件(同步调用)
	OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) error
	// OnCMConsumed 当一个消费者被消费成功(成功发送给全部消费者)后时触发的事件(同步调用)
	OnCMConsumed(record *HistoryRecord)
}

EventHandler 事件触发器

type FlowHandler added in v0.3.2

type FlowHandler func(args *ChainArgs) (stop bool)

type HistoryRecord

type HistoryRecord struct {
	Topic       []byte            // 历史记录所属的topic
	Key         []byte            //
	Value       []byte            //
	Offset      uint64            // 历史记录所属的偏移量
	MessageType proto.MessageType // CM协议类型,以此来反序列化
	Time        int64             // 历史记录创建时间戳,而非CM被创建的事件戳
	Error       string            //
	// contains filtered or unexported fields
}

type HistoryRecordStatus added in v0.3.7

type HistoryRecordStatus string
const (
	BuildFailed HistoryRecordStatus = ""
)

type Hook

type Hook struct {
	Type       proto.MessageType // 据此判断消息是否已实现
	ACKDefined bool              // 消息定义处,定义需要有返回值
	Handler    HookHandler
}

type HookHandler

type HookHandler func(frame *proto.TransferFrame, con transfer.Conn) error

HookHandler 消息处理方法

type Monitor added in v0.3.3

type Monitor struct {
	cronjob.Job
	// contains filtered or unexported fields
}

Monitor 监视器 1. 检测连接成功但不注册的客户端 2. 检测心跳超时的客户端 超时时主动断开连接

func (*Monitor) Do added in v0.3.3

func (k *Monitor) Do(ctx context.Context) error

func (*Monitor) Interval added in v0.3.3

func (k *Monitor) Interval() time.Duration

func (*Monitor) OnClientClosed added in v0.3.3

func (k *Monitor) OnClientClosed(addr string)

OnClientClosed 连接关闭,清空时间信息

func (*Monitor) OnClientConnected added in v0.3.3

func (k *Monitor) OnClientConnected(addr string)

func (*Monitor) OnClientHeartbeat added in v0.3.3

func (k *Monitor) OnClientHeartbeat(addr string)

func (*Monitor) OnClientRegistered added in v0.3.3

func (k *Monitor) OnClientRegistered(addr string, linkType proto.LinkType)

func (*Monitor) OnStartup added in v0.3.3

func (k *Monitor) OnStartup()

func (*Monitor) ReadClientTimeInfo added in v0.3.3

func (k *Monitor) ReadClientTimeInfo(addr string, linkType proto.LinkType) *TimeInfo

func (*Monitor) String added in v0.3.3

func (k *Monitor) String() string

type Producer

type Producer struct {
	Addr string          `json:"addr"`
	Conf *ProducerConfig `json:"conf"`
	Conn transfer.Conn   `json:"-"`
	// contains filtered or unexported fields
}

func (*Producer) Index added in v0.3.3

func (p *Producer) Index() int

func (*Producer) IsFree added in v0.3.3

func (p *Producer) IsFree() bool

func (*Producer) NeedConfirm

func (p *Producer) NeedConfirm() bool

func (*Producer) SetConn added in v0.3.3

func (p *Producer) SetConn(r transfer.Conn) *Producer

type ProducerConfig

type ProducerConfig struct {
	Ack proto.AckType `json:"ack"`
	// 定时器间隔,单位ms,仅生产者有效,生产者需要按照此间隔发送帧消息
	TickerInterval time.Duration `json:"ticker_duration"`
}

type Statistic added in v0.3.7

type Statistic struct {
	// contains filtered or unexported fields
}

func (Statistic) ConsumerTopics added in v0.3.7

func (k Statistic) ConsumerTopics() []*ConsumerTopic

ConsumerTopics 获取消费者订阅的主题名

func (Statistic) LatestRecord added in v0.3.7

func (k Statistic) LatestRecord() []*HistoryRecord

LatestRecord 获取topic的最新消息记录

func (Statistic) Producers added in v0.3.7

func (k Statistic) Producers() []string

Producers 获取全部生产者连接信息

func (Statistic) TopicConsumers added in v0.3.7

func (k Statistic) TopicConsumers() []*TopicConsumer

TopicConsumers 获取topic内的消费者连接

func (Statistic) TopicsName added in v0.3.7

func (k Statistic) TopicsName() []string

TopicsName 获取当前全部Topic名称

func (Statistic) TopicsOffset added in v0.3.7

func (k Statistic) TopicsOffset() []*TopicOffset

TopicsOffset 获取全部的Topic以及响应的偏移量

type TimeInfo added in v0.3.3

type TimeInfo struct {
	Addr         string         `json:"addr"`
	LinkType     proto.LinkType `json:"link_type"`
	ConnectedAt  int64          `json:"connected_at"`  // 连接成功时间戳
	RegisteredAt int64          `json:"registered_at"` // 注册成功时间戳
	HeartbeatAt  int64          `json:"heartbeat_at"`  // 最近的一个心跳时间戳
}

TimeInfo 关于监视器有关的时间信息

func (*TimeInfo) IsFree added in v0.3.3

func (t *TimeInfo) IsFree() bool

func (*TimeInfo) IsRegistered added in v0.3.3

func (t *TimeInfo) IsRegistered() bool

func (*TimeInfo) Reset added in v0.3.3

func (t *TimeInfo) Reset()

type TimeoutEvent added in v0.3.3

type TimeoutEvent struct {
	Addr            string           `json:"addr,omitempty"`
	EventType       TimeoutEventType `json:"event_type,omitempty"`
	LinkType        proto.LinkType   `json:"link_type,omitempty"`
	TimeoutInterval float64          `json:"timeout_interval,omitempty"`
	ConnectedAt     int64            `json:"connected_at"`
	TimeoutAt       int64            `json:"timeout_at"`
}

TimeoutEvent 超时事件

type TimeoutEventType added in v0.3.3

type TimeoutEventType string
const (
	HeartbeatTimeoutEvent TimeoutEventType = "HEARTBEAT_TIMEOUT"
	RegisterTimeoutEvent  TimeoutEventType = "REGISTER_TIMEOUT"
)

type Topic

type Topic struct {
	Name        []byte `json:"name"`         // 唯一标识
	HistorySize int    `json:"history_size"` // 生产者消息缓冲区大小
	Offset      uint64 `json:"offset"`       // 当前数据偏移量,仅用于模糊显示
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(name []byte, bufferSize, historySize int) *Topic

func (*Topic) AddConsumer

func (t *Topic) AddConsumer(con *Consumer)

AddConsumer 添加一个消费者

func (*Topic) LatestMessage added in v0.3.7

func (t *Topic) LatestMessage() *HistoryRecord

LatestMessage 最新的消息记录

func (*Topic) Publisher

func (t *Topic) Publisher(pm *proto.PMessage) uint64

Publisher 发布消费者消息,此处会将来自生产者的消息转换成消费者消息

func (*Topic) RangeConsumer added in v0.3.7

func (t *Topic) RangeConsumer(fn func(c *Consumer))

RangeConsumer 逐个迭代内部消费者

func (*Topic) RemoveConsumer

func (t *Topic) RemoveConsumer(addr string)

RemoveConsumer 移除一个消费者

func (*Topic) SetCrypto added in v0.3.4

func (t *Topic) SetCrypto(crypto proto.Crypto) *Topic

func (*Topic) SetOnConsumed added in v0.3.4

func (t *Topic) SetOnConsumed(onConsumed func(record *HistoryRecord)) *Topic

type TopicConsumer added in v0.3.7

type TopicConsumer struct {
	Name      string   `json:"name" description:"名称"`
	Consumers []string `json:"consumers" description:"消费者连接"`
}

TopicConsumer topic内的消费者

type TopicOffset added in v0.3.7

type TopicOffset struct {
	Name   string `json:"name" description:"名称"`
	Offset uint64 `json:"offset" description:"最新的消息偏移量"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL