Documentation
¶
Overview ¶
生产者消费者模式(Producer-consumer problem)帮助工具.
Index ¶
- Variables
- func SerializeWithJSON() optparams.Option[Options]
- func SerializeWithMsgpack() optparams.Option[Options]
- func ToBytes(spt SerializeProtocolType, payload interface{}) ([]byte, error)
- func ToXAddArgsValue(spt SerializeProtocolType, payload interface{}) (interface{}, error)
- func WithEventParser(fn EventParser) optparams.Option[ListenOptions]
- func WithID(id string) optparams.Option[PublishOptions]
- func WithLimit(limit int64) optparams.Option[PublishOptions]
- func WithMaxlen(n int64) optparams.Option[PublishOptions]
- func WithMinID(minid string) optparams.Option[PublishOptions]
- func WithNoMkStream() optparams.Option[PublishOptions]
- func WithParallelHanddler() optparams.Option[ListenOptions]
- func WithStrictMode() optparams.Option[PublishOptions]
- func WithTopicStartAt(topic string, t time.Time) optparams.Option[ListenOptions]
- func WithTopicStartPosition(topic, flag string) optparams.Option[ListenOptions]
- func WithTopicsStartPositionMap(setting map[string]string) optparams.Option[ListenOptions]
- func WithUUIDSnowflake() optparams.Option[Options]
- func WithUUIDSonyflake() optparams.Option[Options]
- func WithUUIDv4() optparams.Option[Options]
- type ConsumerABC
- type ConsumerInterface
- type Event
- type EventHanddler
- type EventParser
- type ListenOptions
- type Options
- type ProducerConsumerABC
- type ProducerInterface
- type PublishOptions
- type SerializeProtocolType
Constants ¶
This section is empty.
Variables ¶
var DefaultListenOpt = ListenOptions{ Parser: DefaultParser, TopicStarts: map[string]string{}, }
var DefaultPublishOpt = PublishOptions{}
var Defaultopt = Options{ SerializeProtocol: SerializeProtocol_JSON, UUIDType: idgener.IDGEN_UUIDV4, }
var ErrMapPayloadCanNotCast = errors.New("payload can not cast map as map[string]interface{}")
ErrMapPayloadCanNotCast map数据不能被转换
var ErrNotSupportChanAsPayload = errors.New("not support chan as payload")
ErrNotSupportChanAsPayload chan数据不能作为payload
var ErrNotSupportSliceAsPayload = errors.New("not support slice as payload")
ErrNotSupportSliceAsPayload 不支持slice作为负载
var ErrUnSupportSerializeProtocol = errors.New("unsupported serialize protocol")
ErrUnSupportSerializeProtocol 未支持的序列化协议
Functions ¶
func SerializeWithJSON ¶
SerializeWithJSON 使用JSON作为序列化反序列化的协议
func SerializeWithMsgpack ¶
SerializeWithMsgpack 使用JSON作为序列化反序列化的协议
func ToBytes ¶
func ToBytes(spt SerializeProtocolType, payload interface{}) ([]byte, error)
func ToXAddArgsValue ¶
func ToXAddArgsValue(spt SerializeProtocolType, payload interface{}) (interface{}, error)
func WithEventParser ¶
func WithEventParser(fn EventParser) optparams.Option[ListenOptions]
WithParallelHanddler 并行执行注册的handdler
func WithMaxlen ¶
func WithMaxlen(n int64) optparams.Option[PublishOptions]
func WithNoMkStream ¶
func WithNoMkStream() optparams.Option[PublishOptions]
WithNoMkStream stream专用
func WithParallelHanddler ¶
func WithParallelHanddler() optparams.Option[ListenOptions]
WithParallelHanddler 并行执行注册的handdler
func WithStrictMode ¶
func WithStrictMode() optparams.Option[PublishOptions]
func WithTopicStartAt ¶
WithTopicStartAt stream消费者专用,用于设定指定topic消费起始时间
func WithTopicStartPosition ¶
func WithTopicStartPosition(topic, flag string) optparams.Option[ListenOptions]
WithTopicStartPosition stream消费者专用,用于设定指定topic消费起始位置
func WithTopicsStartPositionMap ¶
func WithTopicsStartPositionMap(setting map[string]string) optparams.Option[ListenOptions]
WithTopicsStartPositionMap stream消费者专用,用于设定指定复数topic消费起始位置
func WithUUIDSnowflake ¶
WithUUIDSnowflake 使用snowflake作为uuid的生成器
func WithUUIDSonyflake ¶
WithUUIDSonyflake 使用sonyflake作为uuid的生成器
Types ¶
type ConsumerABC ¶
type ConsumerABC struct { Handdlers map[string][]EventHanddler Handdlerslock sync.RWMutex *ProducerConsumerABC }
ConsumerABC 消费者的基类 定义了回调函数的注册操作和执行操作
func NewConsumerABC ¶
func NewConsumerABC(opts ...optparams.Option[Options]) *ConsumerABC
func (*ConsumerABC) HanddlerEvent ¶
func (c *ConsumerABC) HanddlerEvent(asyncHanddler bool, evt *Event)
HanddlerEvent 调用回调函数处理消息 @params asyncHanddler bool 是否异步执行回调函数 @params evt *Event 待处理的消息
func (*ConsumerABC) RegistHandler ¶
func (c *ConsumerABC) RegistHandler(topic string, fn EventHanddler) error
RegistHandler 将回调函数注册到指定topic上 @params topic string 注册的topic,topic可以是具体的key也可以是*,*表示监听所有消息 @params fn EventHanddler 注册到topic上的回调函数
func (*ConsumerABC) UnRegistHandler ¶
func (c *ConsumerABC) UnRegistHandler(topic string) error
UnRegistHandler 删除特定topic上注册的回调函数 @params topic string 要取消注册回调的topic,注意`*`取消的只是`*`类型的回调并不是全部取消,要全部取消请使用空字符串
type ConsumerInterface ¶
type ConsumerInterface interface { //RegistHandler 注册特定topic的执行函数 //@params topic string 指定目标topic,`*`为全部topic //@params fn EventHanddler 事件触发时执行的函数 RegistHandler(topic string, fn EventHanddler) error //UnRegistHandler 取消注册特定topic的执行函数 //@params topic string 指定目标topic,`*`为全部topic UnRegistHandler(topic string) error //Listen 开始监听 //@params topics string 指定监听的目标topic,使用`,`分隔表示多个topic //@params opts ...optparams.Option[ListenOptions] 监听时的一些配置,具体看listenoption.go说明 Listen(topics string, opts ...optparams.Option[ListenOptions]) error //StopListening 停止监听 StopListening() error }
ConsumerInterface 消费者对象的接口
type Event ¶
type Event struct { Topic string `json:"topic,omitempty" msgpack:"topic,omitempty"` Sender string `json:"sender,omitempty" msgpack:"sender,omitempty"` EventTime int64 `json:"event_time,omitempty" msgpack:"event_time,omitempty"` //毫秒级时间戳 EventID string `json:"event_id,omitempty" msgpack:"event_id,omitempty"` Payload interface{} `json:"payload" msgpack:"payload"` }
Event 消息对象
func DefaultParser ¶
func DefaultParser(SerializeProtocol SerializeProtocolType, topic, eventID, payloadstr string, payload map[string]interface{}) (*Event, error)
DefaultParser 默认的消息处理函数负载会被解析为 m,ap[string]interface{}
type EventHanddler ¶
EventHanddler 处理消息的回调函数 @params msg *Event Event对象
type EventParser ¶
type EventParser func(SerializeProtocol SerializeProtocolType, topic, eventID, payloadstr string, payload map[string]interface{}) (*Event, error)
EventParser 用于将负载字符串转化为event的函数 规定eventID不为""时解析流的消息,用到topic, eventID, payload 规定eventID为""时解析除流之外的消息,用到SerializeProtocol,topic, payloadstr
type ListenOptions ¶
type ListenOptions struct { ParallelHanddler bool Parser EventParser TopicStarts map[string]string //用于指定特定topic的监听起始位置 }
ListenOptions 消费端listen方法的配置
type Options ¶
type Options struct { SerializeProtocol SerializeProtocolType // ClientID string UUIDType idgener.IDGENAlgorithm }
Options broker的配置
type ProducerConsumerABC ¶
type ProducerConsumerABC struct {
Opt Options
}
ProducerConsumerABC 消费者的基类 定义了回调函数的注册操作和执行操作
type ProducerInterface ¶
type ProducerInterface interface { //Publish 发布消息 //@params ctx context.Context 发送的上下文配置 //@params topic string 指定发送去的topic //@params payload interface{} 消息负载 Publish(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[PublishOptions]) error //PubEvent 发布事件,事件会包含除负载外的一些其他元信息 //@params ctx context.Context 发送的上下文配置 //@params topic string 指定发送去的topic //@params payload interface{} 消息负载 PubEvent(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[PublishOptions]) (*Event, error) }
ProducerInterface 生产者对象的接口
type PublishOptions ¶
type PublishOptions struct { NoMkStream bool MinID string ID string Limit int64 MaxLen int64 //stream生产者专用,用于设置流的最大长度 Strict bool //stream生产者专用,用于设置流是否为严格模式 }
PublishOptions 消费端listen方法的配置
type SerializeProtocolType ¶
type SerializeProtocolType uint8
AckModeType stream的Ack模式
const ( //SerializeProtocol_JSON json作为序列化协议 SerializeProtocol_JSON SerializeProtocolType = iota //SerializeProtocol_MSGPACK messagepack作为序列化协议 SerializeProtocol_MSGPACK )