Documentation ¶
Index ¶
- Constants
- Variables
- type Conn
- type ConnPool
- type Consumer
- func (c *Consumer) Close()
- func (c *Consumer) Subscribe(topic string, filter string, f func(...*msg.Message) error) error
- func (c *Consumer) SubscribeWithCount(topic string, filter string, count int64, f func(...*msg.Message) error) error
- func (c *Consumer) SubscribeWithCountAndHandler(topic string, filter string, count int64, h Handler) error
- func (c *Consumer) SubscribeWithHandler(topic string, filter string, h Handler) error
- func (c *Consumer) UnSubscribe(topic string) error
- type DefaultHandler
- type Handler
- type Producer
- func (p *Producer) BatchPublish(m ...*msg.Message) error
- func (p *Producer) Close()
- func (p *Producer) PublishDirect(topic string, filter string, body []byte) error
- func (p *Producer) PublishDirectPersist(topic string, filter string, body []byte) error
- func (p *Producer) PublishFanout(topic string, filter string, body []byte) error
- func (p *Producer) PublishFanoutPersist(topic string, filter string, body []byte) error
- func (p *Producer) PushDirect(topic string, filter string, body []byte) error
- func (p *Producer) PushDirectPersist(topic string, filter string, body []byte) error
- func (p *Producer) PushFanout(topic string, filter string, body []byte) error
- func (p *Producer) PushFanoutPersist(topic string, filter string, body []byte) error
- func (p *Producer) WaitAck(c *Conn) error
- type RateLimiter
Constants ¶
const ( ACTIVE = iota //ACTIVE status CLOSED //CLOSE status )
pool state
const ( //DefaultPoolSize is 10 DefaultPoolSize = 10 //DefaultDialTimeout 5s DefaultDialTimeout = time.Second * 5 //DefaultPoolTimeout 5s DefaultPoolTimeout = time.Second * 5 //DefaultConnPerSecond 500 DefaultConnPerSecond = 500 )
Variables ¶
var ( //ErrParamsInvalid means params invalid, please check ErrParamsInvalid = errors.New("invalid params") //ErrMsgTypeError is msg type error ErrMsgTypeError = errors.New("message type error") )
var ( //ErrSubscribeTimeout error ErrSubscribeTimeout = errors.New("subscribe topic time out") )
var ( //ErrTimeout error ErrTimeout = errors.New("get conn time out") )
Functions ¶
This section is empty.
Types ¶
type ConnPool ¶
ConnPool is the definition of Connect pool
func NewConnPool ¶
func NewConnPool(poolSize int, dialTimeout, poolTimeout time.Duration, addr string, rate int64) *ConnPool
NewConnPool creates a new connection pool
func NewDefaultConnPool ¶
NewDefaultConnPool use some default params
type Consumer ¶
type Consumer struct { Pool *ConnPool // contains filtered or unexported fields }
Consumer instance
func (*Consumer) SubscribeWithCount ¶
func (c *Consumer) SubscribeWithCount(topic string, filter string, count int64, f func(...*msg.Message) error) error
SubscribeWithCount subscribe topic with filter and default handler with count
func (*Consumer) SubscribeWithCountAndHandler ¶
func (c *Consumer) SubscribeWithCountAndHandler(topic string, filter string, count int64, h Handler) error
SubscribeWithCountAndHandler subscribe topic with filter and given handler with count
func (*Consumer) SubscribeWithHandler ¶
SubscribeWithHandler subscribe topic with filter and given handler
type DefaultHandler ¶
type DefaultHandler struct {
// contains filtered or unexported fields
}
DefaultHandler is the default handler, used by Subscribe method.
type Handler ¶
type Handler interface { //Before method Before() //Handle when Handle return value is not nil, run loop will abort, then After() will be invoked. Handle(...*msg.Message) error //After method After() }
Handler defines the functions that handle messages
type Producer ¶
type Producer struct {
Pool *ConnPool
}
Producer instance
func (*Producer) BatchPublish ¶
BatchPublish batch publish msgs all the messages must have MessageType_Publish.
func (*Producer) PublishDirect ¶
PublishDirect publish body to topic with filter, only one subscriber Need Ack
func (*Producer) PublishDirectPersist ¶
PublishDirectPersist publish body to topic with filter, persist, only one subscriber Need Ack
func (*Producer) PublishFanout ¶
PublishFanout publish body to topic with filter, all subscriber Need Ack
func (*Producer) PublishFanoutPersist ¶
PublishFanoutPersist publish body to topic with filter, persist, all subscriber Need Ack
func (*Producer) PushDirect ¶
PushDirect publish body to topic with filter, only one subscriber no Ack
func (*Producer) PushDirectPersist ¶
PushDirectPersist publish body to topic with filter, persist, only one subscriber no Ack
func (*Producer) PushFanout ¶
PushFanout publish body to topic with filter, all subscriber no Ack
func (*Producer) PushFanoutPersist ¶
PushFanoutPersist publish body to topic with filter, persist, all subscriber Need Ack
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is based on Token Bucket algorithm, it supports both blocking and non-blocking way. it is thread-safe in concurrency
func NewRateLimiter ¶
func NewRateLimiter(rate int64, unit time.Duration) *RateLimiter
NewRateLimiter create a new rate limiter
func (*RateLimiter) Acquire ¶
func (r *RateLimiter) Acquire()
Acquire Blocking when the limiter is available
func (*RateLimiter) AcquireCount ¶
func (r *RateLimiter) AcquireCount(count int64)
AcquireCount will take multi-token
func (*RateLimiter) TryAcquire ¶
func (r *RateLimiter) TryAcquire() bool
TryAcquire is Non-blocking, when acquire failed, it returns false.