Versions in this module Expand all Collapse all v2 v2.1.4 Nov 7, 2023 Changes in this version + const Mb + const SQL92 + const TAG + var ErrNoNewMsg = errors.New("no new message found") + func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func CreateReplyMessage(requestMessage *primitive.MessageExt, body []byte) (*primitive.Message, error) + func GetReplyToClient(msg *primitive.MessageExt) string + func IsNoNewMsgError(err error) bool + func IsTagType(exp string) bool + func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) + func NewPushConsumer(opts ...Option) (*pushConsumer, error) + type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue + func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy + func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy + func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy + type ConsumeFromWhere int + const ConsumeFromFirstOffset + const ConsumeFromLastOffset + const ConsumeFromTimestamp + type ConsumeRequest struct + func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue + func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt + func (cr *ConsumeRequest) GetPQ() *processQueue + type ConsumeResult int + const Commit + const ConsumeRetryLater + const ConsumeSuccess + const Rollback + const SuspendCurrentQueueAMoment + type ConsumeResultHolder struct + type ConsumeStatus struct + ConsumeFailedMsgs int64 + ConsumeFailedTPS float64 + ConsumeOKTPS float64 + ConsumeRT float64 + PullRT float64 + PullTPS float64 + type ConsumeType string + type ConsumerReturn int + const ExceptionReturn + const FailedReturn + const NullReturn + const SuccessReturn + const TimeoutReturn + type ExpressionType string + type Limiter func(topic string) + type MessageModel int + const BroadCasting + const Clustering + func (mode MessageModel) String() string + type MessageQueueKey primitive.MessageQueue + func (mq *MessageQueueKey) UnmarshalText(text []byte) error + func (mq MessageQueueKey) MarshalText() (text []byte, err error) + type MessageSelector struct + Expression string + Type ExpressionType + type MockOffsetStore struct + func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore + func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder + type MockOffsetStoreMockRecorder struct + type OffsetSerializeWrapper struct + OffsetTable map[MessageQueueKey]int64 + type OffsetStore interface + func NewLocalFileOffsetStore(clientID, group string) OffsetStore + func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore + type Option func(*consumerOptions) + func WithAutoCommit(auto bool) Option + func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option + func WithConsumeFromWhere(w ConsumeFromWhere) Option + func WithConsumeGoroutineNums(nums int) Option + func WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize int) Option + func WithConsumeTPS(tps int32) Option + func WithConsumeTimeout(timeout time.Duration) Option + func WithConsumeTimestamp(consumeTimestamp string) Option + func WithConsumerModel(m MessageModel) Option + func WithConsumerOrder(order bool) Option + func WithConsumerPullTimeout(consumerPullTimeout time.Duration) Option + func WithCredentials(c primitive.Credentials) Option + func WithFilterMessageHook(hooks []hooks.FilterMessageHook) Option + func WithGroupName(group string) Option + func WithInstance(name string) Option + func WithInterceptor(fs ...primitive.Interceptor) Option + func WithLimiter(limiter Limiter) Option + func WithMaxReconsumeTimes(times int32) Option + func WithNameServer(nameServers primitive.NamesrvAddr) Option + func WithNameServerDomain(nameServerUrl string) Option + func WithNamespace(namespace string) Option + func WithNsResolver(resolver primitive.NsResolver) Option + func WithPullBatchSize(batchSize int32) Option + func WithPullInterval(interval time.Duration) Option + func WithPullThresholdForQueue(pullThresholdForQueue int64) Option + func WithPullThresholdForTopic(pullThresholdForTopic int) Option + func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option + func WithPullThresholdSizeForTopic(pullThresholdSizeForTopic int) Option + func WithRebalanceLockInterval(interval time.Duration) Option + func WithRetry(retries int) Option + func WithStrategy(strategy AllocateStrategy) Option + func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option + func WithTls(useTls bool) Option + func WithTrace(traceCfg *primitive.TraceConfig) Option + func WithUnitName(unitName string) Option + func WithVIPChannel(enable bool) Option + type PullRequest struct + func (pr *PullRequest) String() string + type PushConsumerCallback struct + func (callback PushConsumerCallback) UniqueID() string + type QueueLock struct + type StatsManager struct + func NewStatsManager() *StatsManager + func (mgr *StatsManager) GetConsumeStatus(group, topic string) ConsumeStatus + func (mgr *StatsManager) ShutDownStat()