Documentation ¶
Overview ¶
Package consumer is a generated GoMock package.
Index ¶
- Constants
- Variables
- func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func CreateReplyMessage(requestMessage *primitive.MessageExt, body []byte) (*primitive.Message, error)
- func GetReplyToClient(msg *primitive.MessageExt) string
- func IsNoNewMsgError(err error) bool
- func IsTagType(exp string) bool
- func NewPushConsumer(opts ...Option) (*pushConsumer, error)
- type AllocateStrategy
- type ConsumeFromWhere
- type ConsumeRequest
- type ConsumeResult
- type ConsumeResultHolder
- type ConsumeStatus
- type ConsumeType
- type ConsumerReturn
- type DefaultPullConsumer
- func (pc *DefaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest, result ConsumeResult)
- func (pc *DefaultPullConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult
- func (pc *DefaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error)
- func (pc *DefaultPullConsumer) GetClient() internal.RMQClient
- func (pc *DefaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo
- func (pc *DefaultPullConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus
- func (pc *DefaultPullConsumer) GetModel() string
- func (pc *DefaultPullConsumer) GetWhere() string
- func (pc *DefaultPullConsumer) GetcType() string
- func (pc *DefaultPullConsumer) IsSubscribeTopicNeedUpdate(topic string) bool
- func (pc *DefaultPullConsumer) IsUnitMode() bool
- func (pc *DefaultPullConsumer) PersistConsumerOffset() error
- func (pc *DefaultPullConsumer) PersistOffset(ctx context.Context, topic string) error
- func (pc *DefaultPullConsumer) Poll(ctx context.Context, timeout time.Duration) (*ConsumeRequest, error)
- func (pc *DefaultPullConsumer) Pull(ctx context.Context, numbers int) (*primitive.PullResult, error)
- func (pc *DefaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
- func (pc *DefaultPullConsumer) Rebalance()
- func (pc *DefaultPullConsumer) RebalanceIfNotPaused()
- func (pc *DefaultPullConsumer) ReleaseSubGroup(group string) bool
- func (pc *DefaultPullConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64)
- func (pc *DefaultPullConsumer) ServiceRunningOk() bool
- func (pc *DefaultPullConsumer) Shutdown() error
- func (pc *DefaultPullConsumer) Start() error
- func (pc *DefaultPullConsumer) Subscribe(topic string, selector MessageSelector) error
- func (pc *DefaultPullConsumer) SubscriptionDataList() []*internal.SubscriptionData
- func (pc *DefaultPullConsumer) Unsubscribe(topic string) error
- func (pc *DefaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error
- func (pc *DefaultPullConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
- type ExpressionType
- type Limiter
- type MessageModel
- type MessageQueueKey
- type MessageSelector
- type MockOffsetStore
- type MockOffsetStoreMockRecorder
- type OffsetSerializeWrapper
- type OffsetStore
- type Option
- func WithAutoCommit(auto bool) Option
- func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option
- func WithConsumeFromWhere(w ConsumeFromWhere) Option
- func WithConsumeGoroutineNums(nums int) Option
- func WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize int) Option
- func WithConsumeTimeout(timeout time.Duration) Option
- func WithConsumeTimestamp(consumeTimestamp string) Option
- func WithConsumerModel(m MessageModel) Option
- func WithConsumerOrder(order bool) Option
- func WithConsumerPullTimeout(consumerPullTimeout time.Duration) Option
- func WithCredentials(c primitive.Credentials) Option
- func WithFilterMessageHook(hooks []hooks.FilterMessageHook) Option
- func WithGroupName(group string) Option
- func WithInstance(name string) Option
- func WithInterceptor(fs ...primitive.Interceptor) Option
- func WithLimiter(limiter Limiter) Option
- func WithMaxReconsumeTimes(times int32) Option
- func WithNameServer(nameServers primitive.NamesrvAddr) Option
- func WithNameServerDomain(nameServerUrl string) Option
- func WithNamespace(namespace string) Option
- func WithNsResolver(resolver primitive.NsResolver) Option
- func WithPullBatchSize(batchSize int32) Option
- func WithPullInterval(interval time.Duration) Option
- func WithPullThresholdForQueue(pullThresholdForQueue int64) Option
- func WithPullThresholdForTopic(pullThresholdForTopic int) Option
- func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option
- func WithPullThresholdSizeForTopic(pullThresholdSizeForTopic int) Option
- func WithRebalanceLockInterval(interval time.Duration) Option
- func WithRetry(retries int) Option
- func WithStrategy(strategy AllocateStrategy) Option
- func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option
- func WithTrace(traceCfg *primitive.TraceConfig) Option
- func WithUnitName(unitName string) Option
- func WithVIPChannel(enable bool) Option
- type PullRequest
- type PushConsumerCallback
- type QueueLock
- type StatsManager
Constants ¶
const ( /** * <ul> * Keywords: * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li> * </ul> * <p/> * <ul> * Data type: * <li>Boolean, like: TRUE, FALSE</li> * <li>String, like: 'abc'</li> * <li>Decimal, like: 123</li> * <li>Float number, like: 3.1415</li> * </ul> * <p/> * <ul> * Grammar: * <li>{@code AND, OR}</li> * <li>{@code >, >=, <, <=, =}</li> * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li> * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li> * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li> * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li> * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li> * </ul> * <p/> * <p> * Example: * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE) * </p> */ SQL92 = ExpressionType("SQL92") /** * Only support or operation such as * "tag1 || tag2 || tag3", <br> * If null or * expression, meaning subscribe all. */ TAG = ExpressionType("TAG") )
const (
Mb = 1024 * 1024
)
Variables ¶
var ErrNoNewMsg = errors.New("no new message found")
ErrNoNewMsg returns a "no new message found". Occurs only when no new message found from broker
Functions ¶
func AllocateByAveragely ¶
func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByAveragelyCircle ¶
func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByMachineNearby ¶
func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
TODO
func CreateReplyMessage ¶
func CreateReplyMessage(requestMessage *primitive.MessageExt, body []byte) (*primitive.Message, error)
CreateReplyMessage build reply message from the request message
func GetReplyToClient ¶
func GetReplyToClient(msg *primitive.MessageExt) string
func IsNoNewMsgError ¶
func NewPushConsumer ¶
Types ¶
type AllocateStrategy ¶
type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
func AllocateByConfig ¶
func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy
func AllocateByConsistentHash ¶
func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy
func AllocateByMachineRoom ¶
func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy
type ConsumeFromWhere ¶
type ConsumeFromWhere int
Consuming point on consumer booting. </p>
There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>
const ( ConsumeFromLastOffset ConsumeFromWhere = iota ConsumeFromFirstOffset ConsumeFromTimestamp )
type ConsumeRequest ¶
type ConsumeRequest struct {
// contains filtered or unexported fields
}
func (*ConsumeRequest) GetMQ ¶
func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue
func (*ConsumeRequest) GetMsgList ¶
func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt
func (*ConsumeRequest) GetPQ ¶
func (cr *ConsumeRequest) GetPQ() *processQueue
type ConsumeResult ¶
type ConsumeResult int
const ( ConsumeSuccess ConsumeResult = iota ConsumeRetryLater Commit Rollback SuspendCurrentQueueAMoment )
type ConsumeResultHolder ¶
type ConsumeResultHolder struct {
ConsumeResult
}
type ConsumeStatus ¶
type ConsumeType ¶
type ConsumeType string
type ConsumerReturn ¶
type ConsumerReturn int
const ( SuccessReturn ConsumerReturn = iota ExceptionReturn NullReturn TimeoutReturn FailedReturn )
type DefaultPullConsumer ¶ added in v1.0.6
type DefaultPullConsumer struct { GroupName string Model MessageModel UnitMode bool // contains filtered or unexported fields }
func NewPullConsumer ¶
func NewPullConsumer(options ...Option) (*DefaultPullConsumer, error)
func (*DefaultPullConsumer) ACK ¶ added in v1.0.6
func (pc *DefaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest, result ConsumeResult)
func (*DefaultPullConsumer) ConsumeMessageDirectly ¶ added in v1.0.6
func (pc *DefaultPullConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult
func (*DefaultPullConsumer) CurrentOffset ¶ added in v1.0.6
func (pc *DefaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error)
CurrentOffset return the current offset of queue in mem.
func (*DefaultPullConsumer) GetClient ¶ added in v1.0.6
func (pc *DefaultPullConsumer) GetClient() internal.RMQClient
func (*DefaultPullConsumer) GetConsumerRunningInfo ¶ added in v1.0.6
func (pc *DefaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo
func (*DefaultPullConsumer) GetConsumerStatus ¶ added in v1.0.6
func (pc *DefaultPullConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus
func (*DefaultPullConsumer) GetModel ¶ added in v1.0.6
func (pc *DefaultPullConsumer) GetModel() string
func (*DefaultPullConsumer) GetWhere ¶ added in v1.0.6
func (pc *DefaultPullConsumer) GetWhere() string
func (*DefaultPullConsumer) GetcType ¶ added in v1.0.6
func (pc *DefaultPullConsumer) GetcType() string
func (*DefaultPullConsumer) IsSubscribeTopicNeedUpdate ¶ added in v1.0.6
func (pc *DefaultPullConsumer) IsSubscribeTopicNeedUpdate(topic string) bool
func (*DefaultPullConsumer) IsUnitMode ¶ added in v1.0.6
func (pc *DefaultPullConsumer) IsUnitMode() bool
func (*DefaultPullConsumer) PersistConsumerOffset ¶ added in v1.0.6
func (pc *DefaultPullConsumer) PersistConsumerOffset() error
func (*DefaultPullConsumer) PersistOffset ¶ added in v1.0.6
func (pc *DefaultPullConsumer) PersistOffset(ctx context.Context, topic string) error
PersistOffset persist all offset in mem.
func (*DefaultPullConsumer) Poll ¶ added in v1.0.6
func (pc *DefaultPullConsumer) Poll(ctx context.Context, timeout time.Duration) (*ConsumeRequest, error)
func (*DefaultPullConsumer) Pull ¶ added in v1.0.6
func (pc *DefaultPullConsumer) Pull(ctx context.Context, numbers int) (*primitive.PullResult, error)
func (*DefaultPullConsumer) PullFrom ¶ added in v1.0.6
func (pc *DefaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
PullFrom pull messages of queue from the offset to offset + numbers
func (*DefaultPullConsumer) Rebalance ¶ added in v1.0.6
func (pc *DefaultPullConsumer) Rebalance()
func (*DefaultPullConsumer) RebalanceIfNotPaused ¶ added in v1.0.6
func (pc *DefaultPullConsumer) RebalanceIfNotPaused()
func (*DefaultPullConsumer) ReleaseSubGroup ¶ added in v1.0.6
func (pc *DefaultPullConsumer) ReleaseSubGroup(group string) bool
func (*DefaultPullConsumer) ResetOffset ¶ added in v1.0.6
func (pc *DefaultPullConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64)
func (*DefaultPullConsumer) ServiceRunningOk ¶ added in v1.0.6
func (pc *DefaultPullConsumer) ServiceRunningOk() bool
func (*DefaultPullConsumer) Shutdown ¶ added in v1.0.6
func (pc *DefaultPullConsumer) Shutdown() error
Shutdown close defaultConsumer, refuse new request.
func (*DefaultPullConsumer) Start ¶ added in v1.0.6
func (pc *DefaultPullConsumer) Start() error
func (*DefaultPullConsumer) Subscribe ¶ added in v1.0.6
func (pc *DefaultPullConsumer) Subscribe(topic string, selector MessageSelector) error
func (*DefaultPullConsumer) SubscriptionDataList ¶ added in v1.0.6
func (pc *DefaultPullConsumer) SubscriptionDataList() []*internal.SubscriptionData
func (*DefaultPullConsumer) Unsubscribe ¶ added in v1.0.6
func (pc *DefaultPullConsumer) Unsubscribe(topic string) error
func (*DefaultPullConsumer) UpdateOffset ¶ added in v1.0.6
func (pc *DefaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error
UpdateOffset updateOffset update offset of queue in mem
func (*DefaultPullConsumer) UpdateTopicSubscribeInfo ¶ added in v1.0.6
func (pc *DefaultPullConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
type ExpressionType ¶
type ExpressionType string
type MessageModel ¶
type MessageModel int
Message model defines the way how messages are delivered to each consumer clients. </p>
RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>
This field defaults to clustering.
const ( BroadCasting MessageModel = iota Clustering )
func (MessageModel) String ¶
func (mode MessageModel) String() string
type MessageQueueKey ¶
type MessageQueueKey primitive.MessageQueue
func (MessageQueueKey) MarshalText ¶
func (mq MessageQueueKey) MarshalText() (text []byte, err error)
func (*MessageQueueKey) UnmarshalText ¶
func (mq *MessageQueueKey) UnmarshalText(text []byte) error
type MessageSelector ¶
type MessageSelector struct { Type ExpressionType Expression string }
type MockOffsetStore ¶
type MockOffsetStore struct {
// contains filtered or unexported fields
}
MockOffsetStore is a mock of OffsetStore interface.
func NewMockOffsetStore ¶
func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore
NewMockOffsetStore creates a new mock instance.
func (*MockOffsetStore) EXPECT ¶
func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockOffsetStoreMockRecorder ¶
type MockOffsetStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore.
type OffsetSerializeWrapper ¶
type OffsetSerializeWrapper struct {
OffsetTable map[MessageQueueKey]int64 `json:"offsetTable"`
}
type OffsetStore ¶
type OffsetStore interface {
// contains filtered or unexported methods
}
func NewLocalFileOffsetStore ¶
func NewLocalFileOffsetStore(clientID, group string) OffsetStore
func NewRemoteOffsetStore ¶
type Option ¶
type Option func(*consumerOptions)
func WithAutoCommit ¶
func WithConsumeFromWhere ¶
func WithConsumeFromWhere(w ConsumeFromWhere) Option
func WithConsumeTimeout ¶
func WithConsumeTimestamp ¶
func WithConsumerModel ¶
func WithConsumerModel(m MessageModel) Option
func WithConsumerOrder ¶
func WithConsumerPullTimeout ¶
func WithCredentials ¶
func WithCredentials(c primitive.Credentials) Option
func WithFilterMessageHook ¶
func WithFilterMessageHook(hooks []hooks.FilterMessageHook) Option
func WithInstance ¶
func WithInterceptor ¶
func WithInterceptor(fs ...primitive.Interceptor) Option
WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call.
func WithLimiter ¶
func WithMaxReconsumeTimes ¶
WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.
func WithNameServer ¶
func WithNameServer(nameServers primitive.NamesrvAddr) Option
WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithNameServerDomain ¶
WithNameServerDomain set NameServer domain
func WithNamespace ¶
WithNamespace set the namespace of consumer
func WithNsResolver ¶
func WithNsResolver(resolver primitive.NsResolver) Option
WithNsResolver set nameserver resolver to fetch nameserver addr
func WithPullBatchSize ¶
func WithPullInterval ¶
func WithRetry ¶
WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead
func WithStrategy ¶
func WithStrategy(strategy AllocateStrategy) Option
func WithTrace ¶
func WithTrace(traceCfg *primitive.TraceConfig) Option
WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithUnitName ¶
WithUnitName set the name of specified unit
func WithVIPChannel ¶
type PullRequest ¶
type PullRequest struct {
// contains filtered or unexported fields
}
func (*PullRequest) String ¶
func (pr *PullRequest) String() string
type PushConsumerCallback ¶
type PushConsumerCallback struct {
// contains filtered or unexported fields
}
func (PushConsumerCallback) UniqueID ¶
func (callback PushConsumerCallback) UniqueID() string
type StatsManager ¶
type StatsManager struct {
// contains filtered or unexported fields
}
func NewStatsManager ¶
func NewStatsManager() *StatsManager
func (*StatsManager) GetConsumeStatus ¶
func (mgr *StatsManager) GetConsumeStatus(group, topic string) ConsumeStatus
func (*StatsManager) ShutDownStat ¶
func (mgr *StatsManager) ShutDownStat()