Documentation ¶
Index ¶
- func BuildMQClientID(ip, unitName, instanceName string) string
- func IsTag(typ string) bool
- func ParseTags(expr string) []string
- type Admin
- type Config
- type ConsumeDirectlyResult
- type ConsumeMessageDirectlyResult
- type Consumer
- type ExprType
- type FindBrokerResult
- type MQClient
- func (c *MQClient) AdminCount() int
- func (c *MQClient) ConsumeMessageDirectly(addr, group, clientID, offsetID string) (ConsumeMessageDirectlyResult, error)
- func (c *MQClient) ConsumerCount() int
- func (c *MQClient) CreateOrUpdateTopic(addr string, header *rpc.CreateOrUpdateTopicHeader, to time.Duration) error
- func (c *MQClient) DeleteTopicInBroker(addr, topic string, to time.Duration) error
- func (c *MQClient) DeleteTopicInNamesrv(addr, topic string, to time.Duration) error
- func (c *MQClient) FindAnyBrokerAddr(brokerName string) (*FindBrokerResult, error)
- func (c *MQClient) FindBrokerAddr(brokerName string, hintBrokerID int32, lock bool) (*FindBrokerResult, error)
- func (c *MQClient) GetBrokerClusterInfo(addr string, to time.Duration) (*route.ClusterInfo, error)
- func (c *MQClient) GetConsumerIDs(addr, group string, to time.Duration) (ids []string, err error)
- func (c *MQClient) GetMasterBrokerAddr(brokerName string) string
- func (c *MQClient) GetMasterBrokerAddrs() []string
- func (c *MQClient) LockMessageQueues(broker, group string, queues []message.Queue, to time.Duration) ([]message.Queue, error)
- func (c *MQClient) MaxOffset(addr, topic string, queueID uint8, to time.Duration) (int64, *rpc.Error)
- func (c *MQClient) ProducerCount() int
- func (c *MQClient) PullMessageAsync(addr string, header *rpc.PullHeader, to time.Duration, ...) error
- func (c *MQClient) PullMessageSync(addr string, header *rpc.PullHeader, to time.Duration) (pr *rpc.PullResponse, err error)
- func (c *MQClient) QueryConsumerOffset(addr, topic, group string, queueID int, timeout time.Duration) (int64, *rpc.Error)
- func (c *MQClient) QueryMessageByOffset(addr string, offset int64, to time.Duration) (*message.Ext, error)
- func (c *MQClient) RegisterAdmin(a Admin) error
- func (c *MQClient) RegisterConsumer(co Consumer) error
- func (c *MQClient) RegisterFilter(group string, subData *SubscribeData) error
- func (c *MQClient) RegisterProducer(p Producer) error
- func (c *MQClient) ResetConsumeOffset(addr, topic, group string, timestamp time.Time, isForce bool, ...) (map[message.Queue]int64, error)
- func (c *MQClient) SearchOffsetByTimestamp(addr, topic string, queueID uint8, timestamp time.Time, to time.Duration) (int64, *rpc.Error)
- func (c *MQClient) SendBack(addr string, h *rpc.SendBackHeader, to time.Duration) (err error)
- func (c *MQClient) SendHeartbeat()
- func (c *MQClient) SendMessageSync(broker string, data []byte, header *rpc.SendHeader, timeout time.Duration) (*rpc.SendResponse, error)
- func (c *MQClient) Shutdown()
- func (c *MQClient) Start() error
- func (c *MQClient) UnlockMessageQueuesOneway(group, broker string, queues []message.Queue) error
- func (c *MQClient) UnregisterAdmin(group string)
- func (c *MQClient) UnregisterConsumer(group string)
- func (c *MQClient) UnregisterProducer(group string)
- func (c *MQClient) UpdateConsumerOffset(addr, topic, group string, queueID int, offset int64, timeout time.Duration) error
- func (c *MQClient) UpdateConsumerOffsetOneway(addr, topic, group string, queueID int, offset int64) error
- func (c *MQClient) UpdateTopicRouterInfoFromNamesrv(topic string) (err error)
- type Producer
- type RunningInfo
- type SubscribeData
- type SubscribeDataTable
- func (t *SubscribeDataTable) Datas() []*SubscribeData
- func (t *SubscribeDataTable) Delete(topic string) *SubscribeData
- func (t *SubscribeDataTable) Get(topic string) *SubscribeData
- func (t *SubscribeDataTable) Put(topic string, d *SubscribeData) *SubscribeData
- func (t *SubscribeDataTable) PutIfAbsent(topic string, d *SubscribeData) *SubscribeData
- func (t *SubscribeDataTable) Topics() []string
- type SubscribeQueueTable
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildMQClientID ¶
BuildMQClientID build the mq client ID
Types ¶
type Config ¶
type Config struct { HeartbeatBrokerInterval time.Duration PollNameServerInterval time.Duration NameServerAddrs []string }
Config the remote client configurations
type ConsumeDirectlyResult ¶
type ConsumeDirectlyResult = rpc.ConsumeDirectlyResult
ConsumeDirectlyResult the flag of the consuming directly
const ( Success ConsumeDirectlyResult = iota Later Rollback Commit Error ReturnNil )
predefined `ConsumeDirectlyResult` values
type ConsumeMessageDirectlyResult ¶
type ConsumeMessageDirectlyResult = rpc.ConsumeMessageDirectlyResult
ConsumeMessageDirectlyResult consume the message directly, sending by the broker
type Consumer ¶
type Consumer interface { Group() string SubscribeTopics() []string UpdateTopicSubscribe(topic string, router *route.TopicRouter) NeedUpdateTopicSubscribe(topic string) bool ConsumeFromWhere() string Model() string Type() string UnitMode() bool Subscriptions() []*SubscribeData ReblanceQueue() RunningInfo() RunningInfo ResetOffset(topic string, offsets map[message.Queue]int64) error ConsumeMessageDirectly(msg *message.Ext, broker string) (ConsumeMessageDirectlyResult, error) }
Consumer interface needed by reblance
type ExprType ¶
type ExprType string
ExprType the filter type of the subcription
const ( // ExprTypeTag tag filter // Only support or operation such as // "tag1 || tag2 || tag3", <br> // If null or * expression,meaning subscribe all. ExprTypeTag ExprType = "TAG" // ExprTypeSQL92 sql filter // //Keywords: //AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL //Boolean, like: TRUE, FALSE //String, like: 'abc' //Decimal, like: 123 //Float number, like: 3.1415 // //Grammar: //AND, OR //>, >=, <, <=, = //BETWEEN A AND B, equals to >=A AND <=B //NOT BETWEEN A AND B, equals to >B OR <A //IN ('a', 'b'), equals to ='a' OR ='b', this operation only support String type. //IS NULL, IS NOT NULL, check parameter whether is null, or not. //=TRUE, =FALSE, check parameter whether is true, or false. // //Example: (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE) ExprTypeSQL92 ExprType = "SQL92" // ExprAll the tags indicates subscribing all the message ExprAll = "*" )
type FindBrokerResult ¶
FindBrokerResult the data returned by FindBrokerAddr
type MQClient ¶
type MQClient struct { Config sync.RWMutex sync.WaitGroup remote.Client // contains filtered or unexported fields }
MQClient the client commuicate with broker it's shared by all consumer & producer with the same client id
func (*MQClient) AdminCount ¶
AdminCount return the registered admin count
func (*MQClient) ConsumeMessageDirectly ¶
func (c *MQClient) ConsumeMessageDirectly(addr, group, clientID, offsetID string) (ConsumeMessageDirectlyResult, error)
ConsumeMessageDirectly send the request to broker to push the message specified by the offsetID to the specified client in the group
func (*MQClient) ConsumerCount ¶
ConsumerCount return the registered consumer count
func (*MQClient) CreateOrUpdateTopic ¶
func (c *MQClient) CreateOrUpdateTopic( addr string, header *rpc.CreateOrUpdateTopicHeader, to time.Duration, ) error
CreateOrUpdateTopic create topic from broker
func (*MQClient) DeleteTopicInBroker ¶
DeleteTopicInBroker delete topic in the broker
func (*MQClient) DeleteTopicInNamesrv ¶
DeleteTopicInNamesrv delete topic in the broker
func (*MQClient) FindAnyBrokerAddr ¶
func (c *MQClient) FindAnyBrokerAddr(brokerName string) ( *FindBrokerResult, error, )
FindAnyBrokerAddr returns any broker whose name is the specified name
func (*MQClient) FindBrokerAddr ¶
func (c *MQClient) FindBrokerAddr(brokerName string, hintBrokerID int32, lock bool) ( *FindBrokerResult, error, )
FindBrokerAddr finds the broker address, returns the address with specified broker id first otherwise, returns any one
func (*MQClient) GetBrokerClusterInfo ¶
GetBrokerClusterInfo get the cluster info from the namesrv
func (*MQClient) GetConsumerIDs ¶
GetConsumerIDs get the client id from the broker wraper
func (*MQClient) GetMasterBrokerAddr ¶
GetMasterBrokerAddr returns the master broker address
func (*MQClient) GetMasterBrokerAddrs ¶
GetMasterBrokerAddrs returns all the master broker addresses
func (*MQClient) LockMessageQueues ¶
func (c *MQClient) LockMessageQueues( broker, group string, queues []message.Queue, to time.Duration, ) ( []message.Queue, error, )
LockMessageQueues send lock message queue request to the broker
func (*MQClient) MaxOffset ¶
func (c *MQClient) MaxOffset(addr, topic string, queueID uint8, to time.Duration) ( int64, *rpc.Error, )
MaxOffset returns the max offset in the consume queue
func (*MQClient) ProducerCount ¶
ProducerCount return the registered producer count
func (*MQClient) PullMessageAsync ¶
func (c *MQClient) PullMessageAsync( addr string, header *rpc.PullHeader, to time.Duration, callback func(*rpc.PullResponse, error), ) error
PullMessageAsync pull message async
func (*MQClient) PullMessageSync ¶
func (c *MQClient) PullMessageSync(addr string, header *rpc.PullHeader, to time.Duration) ( pr *rpc.PullResponse, err error, )
PullMessageSync pull message sync
func (*MQClient) QueryConsumerOffset ¶
func (c *MQClient) QueryConsumerOffset(addr, topic, group string, queueID int, timeout time.Duration) ( int64, *rpc.Error, )
QueryConsumerOffset query cosnume offset wraper
func (*MQClient) QueryMessageByOffset ¶
func (c *MQClient) QueryMessageByOffset(addr string, offset int64, to time.Duration) ( *message.Ext, error, )
QueryMessageByOffset querys the message by message id
func (*MQClient) RegisterAdmin ¶
RegisterAdmin registers admin
func (*MQClient) RegisterConsumer ¶
RegisterConsumer registers consumer
func (*MQClient) RegisterFilter ¶
func (c *MQClient) RegisterFilter(group string, subData *SubscribeData) error
RegisterFilter register the filter to the broker
func (*MQClient) RegisterProducer ¶
RegisterProducer registers producer
func (*MQClient) ResetConsumeOffset ¶
func (c *MQClient) ResetConsumeOffset( addr, topic, group string, timestamp time.Time, isForce bool, timeout time.Duration, ) ( map[message.Queue]int64, error, )
ResetConsumeOffset requests the broker to reset the offsets of the specified topic, the offsets' owner is specified by the group
func (*MQClient) SearchOffsetByTimestamp ¶
func (c *MQClient) SearchOffsetByTimestamp( addr, topic string, queueID uint8, timestamp time.Time, to time.Duration, ) ( int64, *rpc.Error, )
SearchOffsetByTimestamp returns the offset of the specified message queue and the timestamp
func (*MQClient) SendHeartbeat ¶
func (c *MQClient) SendHeartbeat()
SendHeartbeat send the heart beart to the broker server, it locked when sending the data
func (*MQClient) SendMessageSync ¶
func (c *MQClient) SendMessageSync( broker string, data []byte, header *rpc.SendHeader, timeout time.Duration, ) ( *rpc.SendResponse, error, )
SendMessageSync send message to the broker
func (*MQClient) UnlockMessageQueuesOneway ¶
UnlockMessageQueuesOneway send unlock message queue request to the broker
func (*MQClient) UnregisterAdmin ¶
UnregisterAdmin unregister producer
func (*MQClient) UnregisterConsumer ¶
UnregisterConsumer unregister producer
func (*MQClient) UnregisterProducer ¶
UnregisterProducer unregister producer
func (*MQClient) UpdateConsumerOffset ¶
func (c *MQClient) UpdateConsumerOffset( addr, topic, group string, queueID int, offset int64, timeout time.Duration, ) error
UpdateConsumerOffset updates the offset of the consumer queue wraper
func (*MQClient) UpdateConsumerOffsetOneway ¶
func (c *MQClient) UpdateConsumerOffsetOneway( addr, topic, group string, queueID int, offset int64, ) error
UpdateConsumerOffsetOneway updates the offset of the consumer queue wraper
func (*MQClient) UpdateTopicRouterInfoFromNamesrv ¶
UpdateTopicRouterInfoFromNamesrv udpate the topic for the producer/consumer from the namesrv
type Producer ¶
type Producer interface { Group() string PublishTopics() []string UpdateTopicPublish(topic string, router *route.TopicRouter) NeedUpdateTopicPublish(topic string) bool }
Producer interface needed by reblance
type RunningInfo ¶
type RunningInfo struct { Properties map[string]string `json:"properties"` Subscriptions []*SubscribeData `json:"subscriptionSet"` }
RunningInfo consumer running information
type SubscribeData ¶
type SubscribeData struct { Topic string `json:"topic"` Expr string `json:"subString"` Type string `json:"expressionType"` Tags []string `json:"tagsSet"` Codes []uint32 `json:"codeSet"` Version int64 `json:"subVersion"` IsClassFilterMode bool `json:"classFilterMode"` FilterClassSource string `json:"-"` }
SubscribeData subscription information
func BuildSubscribe ¶
func BuildSubscribe(group, topic, expr string, typ ExprType) *SubscribeData
BuildSubscribe build the subscribe data with tag type
func (*SubscribeData) Equal ¶
func (s *SubscribeData) Equal(o *SubscribeData) bool
Equal returns true if equals another, otherwise false
func (*SubscribeData) String ¶
func (s *SubscribeData) String() string
type SubscribeDataTable ¶
type SubscribeDataTable struct {
// contains filtered or unexported fields
}
SubscribeDataTable contains the subscription information of topic, the operations is thread-safe NOTE: donot modify directly
func NewSubcribeTable ¶
func NewSubcribeTable() *SubscribeDataTable
NewSubcribeTable creates one DataTable
func (*SubscribeDataTable) Datas ¶
func (t *SubscribeDataTable) Datas() []*SubscribeData
Datas returns the subscribed datas
func (*SubscribeDataTable) Delete ¶
func (t *SubscribeDataTable) Delete(topic string) *SubscribeData
Delete deletes the data of the specified topic, and return the previous one
func (*SubscribeDataTable) Get ¶
func (t *SubscribeDataTable) Get(topic string) *SubscribeData
Get returns the subcribe data of the topic
func (*SubscribeDataTable) Put ¶
func (t *SubscribeDataTable) Put(topic string, d *SubscribeData) *SubscribeData
Put stores the subcribe data and returns the previous one
func (*SubscribeDataTable) PutIfAbsent ¶
func (t *SubscribeDataTable) PutIfAbsent(topic string, d *SubscribeData) *SubscribeData
PutIfAbsent stores the subcribe data and returns the previous one
func (*SubscribeDataTable) Topics ¶
func (t *SubscribeDataTable) Topics() []string
Topics returns the topics
type SubscribeQueueTable ¶
type SubscribeQueueTable struct {
// contains filtered or unexported fields
}
SubscribeQueueTable contains the message queues of topic, the operations is thread-safe
func NewSubscribeQueueTable ¶
func NewSubscribeQueueTable() *SubscribeQueueTable
NewSubscribeQueueTable creates the QueueTable
func (*SubscribeQueueTable) Delete ¶
func (t *SubscribeQueueTable) Delete(topic string) []*message.Queue
Delete returns the topics
func (*SubscribeQueueTable) Get ¶
func (t *SubscribeQueueTable) Get(topic string) []*message.Queue
Get returns the queues of the topic
func (*SubscribeQueueTable) Topics ¶
func (t *SubscribeQueueTable) Topics() []string
Topics returns the topics