Versions in this module Expand all Collapse all v2 v2.1.2 Sep 9, 2022 v2.1.1 Sep 9, 2022 Changes in this version + const ClientInnerProducerGroup + const ConsumerType + const DEFAULT_NAMESRV_ADDR + const DefaultConsumerGroup + const EnvNameServerAddr + const MasterId + const ProducerType + const PropClientVersion + const PropConsumeOrderly + const PropConsumeType + const PropConsumerStartTimestamp + const PropNameServerAddr + const PropThreadPoolCoreSize + const ReplyMessageFlag + const ReplyTopicPostfix + const ReqCheckTransactionState + const ReqConsumeMessageDirectly + const ReqConsumerSendMsgBack + const ReqCreateTopic + const ReqDeleteTopicInBroker + const ReqDeleteTopicInNameSrv + const ReqENDTransaction + const ReqGetAllTopicListFromNameServer + const ReqGetBrokerClusterInfo + const ReqGetConsumerListByGroup + const ReqGetConsumerRunningInfo + const ReqGetMaxOffset + const ReqGetMinOffset + const ReqGetRouteInfoByTopic + const ReqHeartBeat + const ReqLockBatchMQ + const ReqNotifyConsumerIdsChanged + const ReqPullMessage + const ReqPushReplyMessageToClient + const ReqQueryConsumerOffset + const ReqQueryMessage + const ReqResetConsumerOffset + const ReqSearchOffsetByTimestamp + const ReqSendBatchMessage + const ReqSendMessage + const ReqSendReplyMessage + const ReqSendReplyMessageV2 + const ReqUnlockBatchMQ + const ReqUpdateConsumerOffset + const ReqViewMessageByID + const ResError + const ResFlushDiskTimeout + const ResFlushSlaveTimeout + const ResPullNotFound + const ResPullOffsetMoved + const ResPullRetryImmediately + const ResQueryNotFount + const ResSlaveNotAvailable + const ResSuccess + const ResTopicNotExist + const RetryGroupTopicPrefix + const RmqSysTraceTopic + const SystemTopicPrefix + const TraceGroupName + const TraceTopicPrefix + const V4_1_0 + var ErrIllegalIP = errors.New("IP addr error") + var ErrMultiIP = errors.New("multiple IP addr does not support") + var ErrNoNameserver = errors.New("nameServerAddrs can't be empty.") + var ErrServiceState = errors2.ErrService + var RequestResponseFutureMap = NewRequestResponseFutureMap() + func GetNamesrv(clientId string) (*namesrvs, error) + func GetReplyTopic(clusterName string) string + func GetRetryTopic(group string) string + func NewHeartbeatData(clientID string) *heartbeatData + func NewNamesrv(resolver primitive.NsResolver, config *remote.RemotingClientConfig) (*namesrvs, error) + func NewRequestResponseFutureMap() *requestResponseFutureCache + func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher + func ValidateGroup(group string) + type BrokerData struct + BrokerAddresses map[int64]string + BrokerName string + Cluster string + func (b *BrokerData) Equals(bd *BrokerData) bool + type CheckTransactionStateCallback struct + Addr net.Addr + Header CheckTransactionStateRequestHeader + Msg *primitive.MessageExt + type CheckTransactionStateRequestHeader struct + CommitLogOffset int64 + MsgId string + OffsetMsgId string + TranStateTableOffset int64 + TransactionId string + func (request *CheckTransactionStateRequestHeader) Decode(properties map[string]string) + func (request *CheckTransactionStateRequestHeader) Encode() map[string]string + type ClientOptions struct + ClientIP string + Credentials primitive.Credentials + GroupName string + InstanceName string + Interceptors []primitive.Interceptor + NameServerAddrs primitive.NamesrvAddr + Namespace string + Namesrv Namesrvs + RemotingClientConfig *remote.RemotingClientConfig + Resolver primitive.NsResolver + RetryTimes int + UnitMode bool + UnitName string + VIPChannelEnabled bool + func DefaultClientOptions() ClientOptions + func (opt *ClientOptions) ChangeInstanceNameToPID() + func (opt *ClientOptions) String() string + type ConsumeMessageDirectlyHeader struct + func (request *ConsumeMessageDirectlyHeader) Decode(properties map[string]string) + func (request *ConsumeMessageDirectlyHeader) Encode() map[string]string + type ConsumeMessageDirectlyResult struct + AutoCommit bool + ConsumeResult ConsumeResult + Order bool + Remark string + SpentTimeMills int64 + func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) + type ConsumeResult int + const Commit + const ConsumeRetryLater + const ConsumeSuccess + const ReturnNull + const Rollback + const ThrowException + type ConsumeStatus struct + ConsumeFailedMsgs int64 + ConsumeFailedTPS float64 + ConsumeOKTPS float64 + ConsumeRT float64 + PullRT float64 + PullTPS float64 + type ConsumerRunningInfo struct + JStack string + MQTable map[primitive.MessageQueue]ProcessQueueInfo + Properties map[string]string + StatusTable map[string]ConsumeStatus + SubscriptionData map[*SubscriptionData]bool + func NewConsumerRunningInfo() *ConsumerRunningInfo + func (info ConsumerRunningInfo) Encode() ([]byte, error) + type ConsumerSendMsgBackRequestHeader struct + DelayLevel int + Group string + MaxReconsumeTimes int32 + Offset int64 + OriginMsgId string + OriginTopic string + UnitMode bool + func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string + type CreateTopicRequestHeader struct + DefaultTopic string + Order bool + Perm int + ReadQueueNums int + Topic string + TopicFilterType string + TopicSysFlag int + WriteQueueNums int + func (request *CreateTopicRequestHeader) Encode() map[string]string + type DeleteTopicRequestHeader struct + Topic string + func (request *DeleteTopicRequestHeader) Encode() map[string]string + type EndTransactionRequestHeader struct + CommitLogOffset int64 + CommitOrRollback int + FromTransactionCheck bool + MsgID string + ProducerGroup string + TranStateTableOffset int64 + TransactionId string + func (request *EndTransactionRequestHeader) Encode() map[string]string + type FindBrokerResult struct + BrokerAddr string + BrokerVersion int32 + Slave bool + type GetConsumerListRequestHeader struct + ConsumerGroup string + func (request *GetConsumerListRequestHeader) Encode() map[string]string + type GetConsumerRunningInfoHeader struct + func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string) + func (request *GetConsumerRunningInfoHeader) Encode() map[string]string + type GetMaxOffsetRequestHeader struct + QueueId int + Topic string + func (request *GetMaxOffsetRequestHeader) Encode() map[string]string + type GetRouteInfoRequestHeader struct + Topic string + func (request *GetRouteInfoRequestHeader) Encode() map[string]string + type InnerConsumer interface + ConsumeMessageDirectly func(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult + GetConsumerRunningInfo func(stack bool) *ConsumerRunningInfo + GetModel func() string + GetWhere func() string + GetcType func() string + IsSubscribeTopicNeedUpdate func(topic string) bool + IsUnitMode func() bool + PersistConsumerOffset func() error + Rebalance func() + RebalanceIfNotPaused func() + ResetOffset func(topic string, table map[primitive.MessageQueue]int64) + SubscriptionDataList func() []*SubscriptionData + UpdateTopicSubscribeInfo func(topic string, mqs []*primitive.MessageQueue) + type InnerProducer interface + IsPublishTopicNeedUpdate func(topic string) bool + IsUnitMode func() bool + PublishTopicList func() []string + UpdateTopicPublishInfo func(topic string, info *TopicPublishInfo) + type Keyset map[string]struct + type MockInnerConsumer struct + func NewMockInnerConsumer(ctrl *gomock.Controller) *MockInnerConsumer + func (m *MockInnerConsumer) EXPECT() *MockInnerConsumerMockRecorder + func (m *MockInnerConsumer) GetConsumerRunningInfo() *ConsumerRunningInfo + func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool + func (m *MockInnerConsumer) IsUnitMode() bool + func (m *MockInnerConsumer) PersistConsumerOffset() error + func (m *MockInnerConsumer) Rebalance() + func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData + func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) + type MockInnerConsumerMockRecorder struct + func (mr *MockInnerConsumerMockRecorder) GetConsumerRunningInfo() *gomock.Call + func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic interface{}) *gomock.Call + func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call + func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call + func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call + func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call + func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs interface{}) *gomock.Call + type MockInnerProducer struct + func NewMockInnerProducer(ctrl *gomock.Controller) *MockInnerProducer + func (m *MockInnerProducer) EXPECT() *MockInnerProducerMockRecorder + func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool + func (m *MockInnerProducer) IsUnitMode() bool + func (m *MockInnerProducer) PublishTopicList() []string + func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo) + type MockInnerProducerMockRecorder struct + func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic interface{}) *gomock.Call + func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call + func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call + func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info interface{}) *gomock.Call + type MockNamesrvs struct + func NewMockNamesrvs(ctrl *gomock.Controller) *MockNamesrvs + func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) + func (m *MockNamesrvs) AddrList() []string + func (m *MockNamesrvs) EXPECT() *MockNamesrvsMockRecorder + func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) + func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) + func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string + func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string + func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult + func (m *MockNamesrvs) UpdateNameServerAddress() + func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) + func (m *MockNamesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) + type MockNamesrvsMockRecorder struct + func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) *gomock.Call + func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call + func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic interface{}) *gomock.Call + func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{}) *gomock.Call + func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName interface{}) *gomock.Call + func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) *gomock.Call + func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker interface{}) *gomock.Call + func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress() *gomock.Call + func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) *gomock.Call + type MockRMQClient struct + Namesrv *MockNamesrvs + func NewMockRMQClient(ctrl *gomock.Controller) *MockRMQClient + func (m *MockRMQClient) CheckClientInBroker() + func (m *MockRMQClient) ClientID() string + func (m *MockRMQClient) EXPECT() *MockRMQClientMockRecorder + func (m *MockRMQClient) GetNameSrv() Namesrvs + func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand, ...) error + func (m *MockRMQClient) InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand, ...) error + func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand, ...) (*remote.RemotingCommand, error) + func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, ...) error + func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) + func (m *MockRMQClient) RebalanceImmediately() + func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) error + func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) error + func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() + func (m *MockRMQClient) SetNameSrv(mockNamesrvs *MockNamesrvs) + func (m *MockRMQClient) Shutdown() + func (m *MockRMQClient) Start() + func (m *MockRMQClient) UnregisterConsumer(group string) + func (m *MockRMQClient) UnregisterProducer(group string) + func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) + func (m *MockRMQClient) UpdateTopicRouteInfo() + type MockRMQClientMockRecorder struct + func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call + func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call + func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, f interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) InvokeOneWay(ctx, addr, request, timeoutMillis interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) InvokeSync(ctx, addr, request, timeoutMillis interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp interface{}, msgs ...interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call + func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock() *gomock.Call + func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call + func (mr *MockRMQClientMockRecorder) Start() *gomock.Call + func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) UnregisterProducer(group interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data, changed interface{}) *gomock.Call + func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call + type Namesrvs interface + AddBroker func(routeData *TopicRouteData) + AddrList func() []string + FetchPublishMessageQueues func(topic string) ([]*primitive.MessageQueue, error) + FetchSubscribeMessageQueues func(topic string) ([]*primitive.MessageQueue, error) + FindBrokerAddrByName func(brokerName string) string + FindBrokerAddrByTopic func(topic string) string + FindBrokerAddressInSubscribe func(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult + UpdateNameServerAddress func() + UpdateTopicRouteInfo func(topic string) (routeData *TopicRouteData, changed bool, err error) + UpdateTopicRouteInfoWithDefault func(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) + type ProcessQueueInfo struct + CachedMsgCount int + CachedMsgMaxOffset int64 + CachedMsgMinOffset int64 + CachedMsgSizeInMiB int64 + CommitOffset int64 + Dropped bool + LastConsumeTimestamp int64 + LastLockTimestamp int64 + LastPullTimestamp int64 + Locked bool + TransactionMsgCount int + TransactionMsgMaxOffset int64 + TransactionMsgMinOffset int64 + TryUnlockTimes int64 + type PullMessageRequestHeader struct + CommitOffset int64 + ConsumerGroup string + ExpressionType string + MaxMsgNums int32 + QueueId int32 + QueueOffset int64 + SubExpression string + SubVersion int64 + SuspendTimeoutMillis time.Duration + SysFlag int32 + Topic string + func (request *PullMessageRequestHeader) Encode() map[string]string + type PullMessageResponse struct + MaxOffset int64 + MinOffset int64 + NextBeginOffset int64 + SuggestWhichBrokerId int64 + type QueryConsumerOffsetRequestHeader struct + ConsumerGroup string + QueueId int + Topic string + func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string + type QueryMessageRequestHeader struct + BeginTimestamp int64 + EndTimestamp int64 + Key string + MaxNum int + Topic string + func (request *QueryMessageRequestHeader) Decode(properties map[string]string) error + func (request *QueryMessageRequestHeader) Encode() map[string]string + type QueueData struct + BrokerName string + Perm int + ReadQueueNums int + TopicSynFlag int + WriteQueueNums int + func (q *QueueData) Equals(qd *QueueData) bool + type RMQClient interface + CheckClientInBroker func() + ClientID func() string + GetNameSrv func() Namesrvs + InvokeAsync func(ctx context.Context, addr string, request *remote.RemotingCommand, ...) error + InvokeOneWay func(ctx context.Context, addr string, request *remote.RemotingCommand, ...) error + InvokeSync func(ctx context.Context, addr string, request *remote.RemotingCommand, ...) (*remote.RemotingCommand, error) + ProcessSendResponse func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, ...) error + PullMessage func(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) + RebalanceImmediately func() + RegisterConsumer func(group string, consumer InnerConsumer) error + RegisterProducer func(group string, producer InnerProducer) error + SendHeartbeatToAllBrokerWithLock func() + Shutdown func() + Start func() + UnregisterConsumer func(group string) + UnregisterProducer func(group string) + UpdatePublishInfo func(topic string, data *TopicRouteData, changed bool) + UpdateTopicRouteInfo func() + func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient + type ReplyMessageRequestHeader struct + func (request *ReplyMessageRequestHeader) Decode(properties map[string]string) + func (request *ReplyMessageRequestHeader) Encode() map[string]string + type RequestCallback func(ctx context.Context, msg *primitive.Message, err error) + type RequestResponseFuture struct + BeginTime time.Time + CauseErr error + CorrelationId string + Done chan struct{} + RequestCallback RequestCallback + ResponseMsg *primitive.Message + SendRequestOk bool + Timeout time.Duration + func NewRequestResponseFuture(correlationId string, timeout time.Duration, callback RequestCallback) *RequestResponseFuture + func (rf *RequestResponseFuture) ExecuteRequestCallback() + func (rf *RequestResponseFuture) IsTimeout() bool + func (rf *RequestResponseFuture) PutResponseMessage(message *primitive.Message) + func (rf *RequestResponseFuture) WaitResponseMessage(reqMsg *primitive.Message) (*primitive.Message, error) + type ResetOffsetBody struct + OffsetTable map[primitive.MessageQueue]int64 + func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) + type ResetOffsetHeader struct + func (request *ResetOffsetHeader) Decode(properties map[string]string) + func (request *ResetOffsetHeader) Encode() map[string]string + type SearchOffsetRequestHeader struct + QueueId int + Timestamp int64 + Topic string + func (request *SearchOffsetRequestHeader) Encode() map[string]string + type SendMessageRequestHeader struct + Batch bool + BornTimestamp int64 + DefaultTopic string + DefaultTopicQueueNums int + Flag int32 + MaxReconsumeTimes int + ProducerGroup string + Properties string + QueueId int + ReconsumeTimes int + SysFlag int + Topic string + UnitMode bool + func (request *SendMessageRequestHeader) Encode() map[string]string + type SendMessageRequestV2Header struct + func (request *SendMessageRequestV2Header) Encode() map[string]string + type SendMessageResponse struct + MsgId string + MsgRegion string + QueueId int32 + QueueOffset int64 + TransactionId string + func (response *SendMessageResponse) Decode(properties map[string]string) + type ServiceState int32 + const StateCreateJust + const StateRunning + const StateShutdown + const StateStartFailed + type SubscriptionData struct + ClassFilterMode bool + Codes utils.Set + ExpType string + SubString string + SubVersion int64 + Tags utils.Set + Topic string + type TopicListRequestHeader struct + Topic string + func (request *TopicListRequestHeader) Encode() map[string]string + type TopicPublishInfo struct + HaveTopicRouterInfo bool + MqList []*primitive.MessageQueue + OrderTopic bool + RouteData *TopicRouteData + TopicQueueIndex int32 + type TopicRouteData struct + BrokerDataList []*BrokerData + OrderTopicConf string + QueueDataList []*QueueData + func (routeData *TopicRouteData) String() string + type TraceBean struct + BodyLength int + ClientHost string + Keys string + MsgId string + MsgType primitive.MessageType + OffsetMsgId string + RetryTimes int + StoreHost string + StoreTime int64 + Tags string + Topic string + type TraceContext struct + ContextCode int + CostTime int64 + GroupName string + IsSuccess bool + RegionId string + RegionName string + RequestId string + TimeStamp int64 + TraceBeans []TraceBean + TraceType TraceType + type TraceDispatcher interface + Append func(ctx TraceContext) bool + Close func() + GetTraceTopicName func() string + Start func() + type TraceTransferBean struct + type TraceType string + const Pub + const SubAfter + const SubBefore + type TransactionListener interface + type UpdateConsumerOffsetRequestHeader struct + CommitOffset int64 + ConsumerGroup string + QueueId int + Topic string + func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string + type ViewMessageRequestHeader struct + Offset int64 + func (request *ViewMessageRequestHeader) Encode() map[string]string