Documentation ¶
Overview ¶
Package consumer is a generated GoMock package.
Index ¶
- Constants
- Variables
- func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func IsTagType(exp string) bool
- func NewPullConsumer(options ...Option) (*defaultPullConsumer, error)
- func NewPushConsumer(opts ...Option) (*pushConsumer, error)
- func ShutDownStatis()
- type AllocateStrategy
- type ConsumeFromWhere
- type ConsumeResult
- type ConsumeResultHolder
- type ConsumeStatus
- type ConsumeType
- type ConsumerReturn
- type ExpressionType
- type MessageModel
- type MessageQueueKey
- type MessageSelector
- type MockOffsetStore
- type MockOffsetStoreMockRecorder
- type OffsetSerializeWrapper
- type OffsetStore
- type Option
- func WithAutoCommit(auto bool) Option
- func WithConsumeFromWhere(w ConsumeFromWhere) Option
- func WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize int) Option
- func WithConsumerModel(m MessageModel) Option
- func WithConsumerOrder(order bool) Option
- func WithCredentials(c primitive.Credentials) Option
- func WithGroupName(group string) Option
- func WithInstance(name string) Option
- func WithInterceptor(fs ...primitive.Interceptor) Option
- func WithMaxReconsumeTimes(times int32) Option
- func WithNameServer(nameServers primitive.NamesrvAddr) Option
- func WithNameServerDomain(nameServerUrl string) Option
- func WithNamespace(namespace string) Option
- func WithNsResovler(resolver primitive.NsResolver) Option
- func WithPullBatchSize(batchSize int32) Option
- func WithPullInterval(interval time.Duration) Option
- func WithRebalanceLockInterval(interval time.Duration) Option
- func WithRetry(retries int) Option
- func WithStrategy(strategy AllocateStrategy) Option
- func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option
- func WithTrace(traceCfg *primitive.TraceConfig) Option
- func WithVIPChannel(enable bool) Option
- type PullConsumer
- type PullRequest
- type PushConsumerCallback
- type QueueLock
Constants ¶
const ( /** * <ul> * Keywords: * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li> * </ul> * <p/> * <ul> * Data type: * <li>Boolean, like: TRUE, FALSE</li> * <li>String, like: 'abc'</li> * <li>Decimal, like: 123</li> * <li>Float number, like: 3.1415</li> * </ul> * <p/> * <ul> * Grammar: * <li>{@code AND, OR}</li> * <li>{@code >, >=, <, <=, =}</li> * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li> * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li> * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li> * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li> * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li> * </ul> * <p/> * <p> * Example: * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE) * </p> */ SQL92 = ExpressionType("SQL92") /** * Only support or operation such as * "tag1 || tag2 || tag3", <br> * If null or * expression, meaning subscribe all. */ TAG = ExpressionType("TAG") )
const (
Mb = 1024 * 1024
)
Variables ¶
var ( ErrCreated = errors.New("consumer group has been created") ErrBrokerNotFound = errors.New("broker can not found") )
Functions ¶
func AllocateByAveragely ¶
func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByAveragelyCircle ¶
func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByMachineNearby ¶
func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
TODO
func NewPullConsumer ¶
func NewPushConsumer ¶
func ShutDownStatis ¶
func ShutDownStatis()
Types ¶
type AllocateStrategy ¶
type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
func AllocateByConfig ¶
func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy
func AllocateByConsistentHash ¶
func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy
func AllocateByMachineRoom ¶
func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy
type ConsumeFromWhere ¶
type ConsumeFromWhere int
Consuming point on consumer booting. </p>
There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>
const ( ConsumeFromLastOffset ConsumeFromWhere = iota ConsumeFromFirstOffset ConsumeFromTimestamp )
type ConsumeResult ¶
type ConsumeResult int
const ( ConsumeSuccess ConsumeResult = iota ConsumeRetryLater Commit Rollback SuspendCurrentQueueAMoment )
type ConsumeResultHolder ¶
type ConsumeResultHolder struct {
ConsumeResult
}
type ConsumeStatus ¶
type ConsumeStatus struct { PullRT float64 PullTPS float64 ConsumeRT float64 ConsumeOKTPS float64 ConsumeFailedTPS float64 ConsumeFailedMsgs int64 }
func GetConsumeStatus ¶
func GetConsumeStatus(group, topic string) ConsumeStatus
type ConsumeType ¶
type ConsumeType string
type ConsumerReturn ¶
type ConsumerReturn int
const ( SuccessReturn ConsumerReturn = iota ExceptionReturn NullReturn TimeoutReturn FailedReturn )
type ExpressionType ¶
type ExpressionType string
type MessageModel ¶
type MessageModel int
Message model defines the way how messages are delivered to each consumer clients. </p>
RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>
This field defaults to clustering.
const ( BroadCasting MessageModel = iota Clustering )
func (MessageModel) String ¶
func (mode MessageModel) String() string
type MessageQueueKey ¶
type MessageQueueKey primitive.MessageQueue
func (MessageQueueKey) MarshalText ¶
func (mq MessageQueueKey) MarshalText() (text []byte, err error)
func (*MessageQueueKey) UnmarshalText ¶
func (mq *MessageQueueKey) UnmarshalText(text []byte) error
type MessageSelector ¶
type MessageSelector struct { Type ExpressionType Expression string }
type MockOffsetStore ¶
type MockOffsetStore struct {
// contains filtered or unexported fields
}
MockOffsetStore is a mock of OffsetStore interface
func NewMockOffsetStore ¶
func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore
NewMockOffsetStore creates a new mock instance
func (*MockOffsetStore) EXPECT ¶
func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
type MockOffsetStoreMockRecorder ¶
type MockOffsetStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore
type OffsetSerializeWrapper ¶
type OffsetSerializeWrapper struct {
OffsetTable map[MessageQueueKey]int64 `json:"offsetTable"`
}
type OffsetStore ¶
type OffsetStore interface {
// contains filtered or unexported methods
}
func NewLocalFileOffsetStore ¶
func NewLocalFileOffsetStore(clientID, group string) OffsetStore
func NewRemoteOffsetStore ¶
type Option ¶
type Option func(*consumerOptions)
func WithAutoCommit ¶
func WithConsumeFromWhere ¶
func WithConsumeFromWhere(w ConsumeFromWhere) Option
func WithConsumerModel ¶
func WithConsumerModel(m MessageModel) Option
func WithConsumerOrder ¶
func WithCredentials ¶
func WithCredentials(c primitive.Credentials) Option
func WithInstance ¶
func WithInterceptor ¶
func WithInterceptor(fs ...primitive.Interceptor) Option
WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call.
func WithMaxReconsumeTimes ¶
WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.
func WithNameServer ¶
func WithNameServer(nameServers primitive.NamesrvAddr) Option
WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithNameServerDomain ¶
WithNameServerDomain set NameServer domain
func WithNamespace ¶
WithNamespace set the namespace of consumer
func WithNsResovler ¶
func WithNsResovler(resolver primitive.NsResolver) Option
WithNsResovler set nameserver resolver to fetch nameserver addr
func WithPullBatchSize ¶
func WithPullInterval ¶
func WithRetry ¶
WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead
func WithStrategy ¶
func WithStrategy(strategy AllocateStrategy) Option
func WithTrace ¶
func WithTrace(traceCfg *primitive.TraceConfig) Option
WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithVIPChannel ¶
type PullConsumer ¶
type PullConsumer interface { // Start Start() // Shutdown refuse all new pull operation, finish all submitted. Shutdown() // Pull pull message of topic, selector indicate which queue to pull. Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) // PullFrom pull messages of queue from the offset to offset + numbers PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) // updateOffset update offset of queue in mem UpdateOffset(queue *primitive.MessageQueue, offset int64) error // PersistOffset persist all offset in mem. PersistOffset(ctx context.Context) error // CurrentOffset return the current offset of queue in mem. CurrentOffset(queue *primitive.MessageQueue) (int64, error) }
type PullRequest ¶
type PullRequest struct {
// contains filtered or unexported fields
}
func (*PullRequest) String ¶
func (pr *PullRequest) String() string
type PushConsumerCallback ¶
type PushConsumerCallback struct {
// contains filtered or unexported fields
}
func (PushConsumerCallback) UniqueID ¶
func (callback PushConsumerCallback) UniqueID() string