Versions in this module Expand all Collapse all v2 v2.0.2 Jul 7, 2019 v2.0.0 Jul 7, 2019 Changes in this version + func BuildMQClientID(ip, unitName, instanceName string) string + func IsTag(typ string) bool + func ParseTags(expr string) []string + type Admin interface + Group func() string + type Config struct + HeartbeatBrokerInterval time.Duration + NameServerAddrs []string + PollNameServerInterval time.Duration + type ConsumeDirectlyResult = rpc.ConsumeDirectlyResult + const Commit + const Error + const Later + const ReturnNil + const Rollback + const Success + type ConsumeMessageDirectlyResult = rpc.ConsumeMessageDirectlyResult + type Consumer interface + ConsumeFromWhere func() string + ConsumeMessageDirectly func(msg *message.Ext, broker string) (ConsumeMessageDirectlyResult, error) + Group func() string + Model func() string + NeedUpdateTopicSubscribe func(topic string) bool + ReblanceQueue func() + ResetOffset func(topic string, offsets map[message.Queue]int64) error + RunningInfo func() RunningInfo + SubscribeTopics func() []string + Subscriptions func() []*SubscribeData + Type func() string + UnitMode func() bool + UpdateTopicSubscribe func(topic string, router *route.TopicRouter) + type ExprType string + const ExprAll + const ExprTypeSQL92 + const ExprTypeTag + func (t ExprType) String() string + type FindBrokerResult struct + Addr string + IsSlave bool + Version int32 + type MQClient struct + func New(config *Config, clientID string, logger log.Logger) (c *MQClient, err error) + func (c *MQClient) AdminCount() int + func (c *MQClient) ConsumeMessageDirectly(addr, group, clientID, offsetID string) (ConsumeMessageDirectlyResult, error) + func (c *MQClient) ConsumerCount() int + func (c *MQClient) CreateOrUpdateTopic(addr string, header *rpc.CreateOrUpdateTopicHeader, to time.Duration) error + func (c *MQClient) DeleteTopicInBroker(addr, topic string, to time.Duration) error + func (c *MQClient) DeleteTopicInNamesrv(addr, topic string, to time.Duration) error + func (c *MQClient) FindAnyBrokerAddr(brokerName string) (*FindBrokerResult, error) + func (c *MQClient) FindBrokerAddr(brokerName string, hintBrokerID int32, lock bool) (*FindBrokerResult, error) + func (c *MQClient) GetBrokerClusterInfo(addr string, to time.Duration) (*route.ClusterInfo, error) + func (c *MQClient) GetConsumerIDs(addr, group string, to time.Duration) (ids []string, err error) + func (c *MQClient) GetMasterBrokerAddr(brokerName string) string + func (c *MQClient) GetMasterBrokerAddrs() []string + func (c *MQClient) LockMessageQueues(broker, group string, queues []message.Queue, to time.Duration) ([]message.Queue, error) + func (c *MQClient) MaxOffset(addr, topic string, queueID uint8, to time.Duration) (int64, *rpc.Error) + func (c *MQClient) ProducerCount() int + func (c *MQClient) PullMessageAsync(addr string, header *rpc.PullHeader, to time.Duration, ...) error + func (c *MQClient) PullMessageSync(addr string, header *rpc.PullHeader, to time.Duration) (pr *rpc.PullResponse, err error) + func (c *MQClient) QueryConsumerOffset(addr, topic, group string, queueID int, timeout time.Duration) (int64, *rpc.Error) + func (c *MQClient) QueryMessageByOffset(addr string, offset int64, to time.Duration) (*message.Ext, error) + func (c *MQClient) RegisterAdmin(a Admin) error + func (c *MQClient) RegisterConsumer(co Consumer) error + func (c *MQClient) RegisterFilter(group string, subData *SubscribeData) error + func (c *MQClient) RegisterProducer(p Producer) error + func (c *MQClient) ResetConsumeOffset(addr, topic, group string, timestamp time.Time, isForce bool, ...) (map[message.Queue]int64, error) + func (c *MQClient) SearchOffsetByTimestamp(addr, topic string, queueID uint8, timestamp time.Time, to time.Duration) (int64, *rpc.Error) + func (c *MQClient) SendBack(addr string, h *rpc.SendBackHeader, to time.Duration) (err error) + func (c *MQClient) SendHeartbeat() + func (c *MQClient) SendMessageSync(broker string, data []byte, header *rpc.SendHeader, timeout time.Duration) (*rpc.SendResponse, error) + func (c *MQClient) Shutdown() + func (c *MQClient) Start() error + func (c *MQClient) UnlockMessageQueuesOneway(group, broker string, queues []message.Queue) error + func (c *MQClient) UnregisterAdmin(group string) + func (c *MQClient) UnregisterConsumer(group string) + func (c *MQClient) UnregisterProducer(group string) + func (c *MQClient) UpdateConsumerOffset(addr, topic, group string, queueID int, offset int64, timeout time.Duration) error + func (c *MQClient) UpdateConsumerOffsetOneway(addr, topic, group string, queueID int, offset int64) error + func (c *MQClient) UpdateTopicRouterInfoFromNamesrv(topic string) (err error) + type Producer interface + Group func() string + NeedUpdateTopicPublish func(topic string) bool + PublishTopics func() []string + UpdateTopicPublish func(topic string, router *route.TopicRouter) + type RunningInfo struct + Properties map[string]string + Subscriptions []*SubscribeData + type SubscribeData struct + Codes []uint32 + Expr string + FilterClassSource string + IsClassFilterMode bool + Tags []string + Topic string + Type string + Version int64 + func BuildSubscribe(group, topic, expr string, typ ExprType) *SubscribeData + func (s *SubscribeData) Equal(o *SubscribeData) bool + func (s *SubscribeData) String() string + type SubscribeDataTable struct + func NewSubcribeTable() *SubscribeDataTable + func (t *SubscribeDataTable) Datas() []*SubscribeData + func (t *SubscribeDataTable) Delete(topic string) *SubscribeData + func (t *SubscribeDataTable) Get(topic string) *SubscribeData + func (t *SubscribeDataTable) Put(topic string, d *SubscribeData) *SubscribeData + func (t *SubscribeDataTable) PutIfAbsent(topic string, d *SubscribeData) *SubscribeData + func (t *SubscribeDataTable) Topics() []string + type SubscribeQueueTable struct + func NewSubscribeQueueTable() *SubscribeQueueTable + func (t *SubscribeQueueTable) Delete(topic string) []*message.Queue + func (t *SubscribeQueueTable) Get(topic string) []*message.Queue + func (t *SubscribeQueueTable) Put(topic string, q []*message.Queue) []*message.Queue + func (t *SubscribeQueueTable) Topics() []string