consumer

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package consumer is a generated GoMock package.

Index

Constants

View Source
const (
	/**
	 * <ul>
	 * Keywords:
	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Data type:
	 * <li>Boolean, like: TRUE, FALSE</li>
	 * <li>String, like: 'abc'</li>
	 * <li>Decimal, like: 123</li>
	 * <li>Float number, like: 3.1415</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Grammar:
	 * <li>{@code AND, OR}</li>
	 * <li>{@code >, >=, <, <=, =}</li>
	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
	 * </ul>
	 * <p/>
	 * <p>
	 * Example:
	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
	 * </p>
	 */
	SQL92 = ExpressionType("SQL92")

	/**
	 * Only support or operation such as
	 * "tag1 || tag2 || tag3", <br>
	 * If null or * expression, meaning subscribe all.
	 */
	TAG = ExpressionType("TAG")
)
View Source
const (
	Mb = 1024 * 1024
)

Variables

View Source
var ErrNoNewMsg = errors.New("no new message found")

ErrNoNewMsg returns a "no new message found". Occurs only when no new message found from broker

Functions

func AllocateByAveragely

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByAveragelyCircle

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByMachineNearby

func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

TODO

func CreateReplyMessage

func CreateReplyMessage(requestMessage *primitive.MessageExt, body []byte) (*primitive.Message, error)

CreateReplyMessage build reply message from the request message

func GetReplyToClient

func GetReplyToClient(msg *primitive.MessageExt) string

func IsNoNewMsgError

func IsNoNewMsgError(err error) bool

func IsTagType

func IsTagType(exp string) bool

func NewPushConsumer

func NewPushConsumer(opts ...Option) (*pushConsumer, error)

Types

type AllocateStrategy

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue

func AllocateByConfig

func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy

func AllocateByConsistentHash

func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy

func AllocateByMachineRoom

func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy

type ConsumeFromWhere

type ConsumeFromWhere int

Consuming point on consumer booting. </p>

There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>

const (
	ConsumeFromLastOffset ConsumeFromWhere = iota
	ConsumeFromFirstOffset
	ConsumeFromTimestamp
)

type ConsumeRequest

type ConsumeRequest struct {
	// contains filtered or unexported fields
}

func (*ConsumeRequest) GetMQ

func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue

func (*ConsumeRequest) GetMsgList

func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt

func (*ConsumeRequest) GetPQ

func (cr *ConsumeRequest) GetPQ() *processQueue

type ConsumeResult

type ConsumeResult int
const (
	ConsumeSuccess ConsumeResult = iota
	ConsumeRetryLater
	Commit
	Rollback
	SuspendCurrentQueueAMoment
)

type ConsumeResultHolder

type ConsumeResultHolder struct {
	ConsumeResult
}

type ConsumeStatus

type ConsumeStatus struct {
	PullRT            float64
	PullTPS           float64
	ConsumeRT         float64
	ConsumeOKTPS      float64
	ConsumeFailedTPS  float64
	ConsumeFailedMsgs int64
}

type ConsumeType

type ConsumeType string

type ConsumerReturn

type ConsumerReturn int
const (
	SuccessReturn ConsumerReturn = iota
	ExceptionReturn
	NullReturn
	TimeoutReturn
	FailedReturn
)

type DefaultPullConsumer added in v1.0.6

type DefaultPullConsumer struct {
	GroupName string
	Model     MessageModel
	UnitMode  bool
	// contains filtered or unexported fields
}

func NewPullConsumer

func NewPullConsumer(options ...Option) (*DefaultPullConsumer, error)

func (*DefaultPullConsumer) ACK added in v1.0.6

func (*DefaultPullConsumer) ConsumeMessageDirectly added in v1.0.6

func (pc *DefaultPullConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult

func (*DefaultPullConsumer) CurrentOffset added in v1.0.6

func (pc *DefaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error)

CurrentOffset return the current offset of queue in mem.

func (*DefaultPullConsumer) GetClient added in v1.0.6

func (pc *DefaultPullConsumer) GetClient() internal.RMQClient

func (*DefaultPullConsumer) GetConsumerRunningInfo added in v1.0.6

func (pc *DefaultPullConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo

func (*DefaultPullConsumer) GetConsumerStatus added in v1.0.6

func (pc *DefaultPullConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus

func (*DefaultPullConsumer) GetModel added in v1.0.6

func (pc *DefaultPullConsumer) GetModel() string

func (*DefaultPullConsumer) GetWhere added in v1.0.6

func (pc *DefaultPullConsumer) GetWhere() string

func (*DefaultPullConsumer) GetcType added in v1.0.6

func (pc *DefaultPullConsumer) GetcType() string

func (*DefaultPullConsumer) IsSubscribeTopicNeedUpdate added in v1.0.6

func (pc *DefaultPullConsumer) IsSubscribeTopicNeedUpdate(topic string) bool

func (*DefaultPullConsumer) IsUnitMode added in v1.0.6

func (pc *DefaultPullConsumer) IsUnitMode() bool

func (*DefaultPullConsumer) PersistConsumerOffset added in v1.0.6

func (pc *DefaultPullConsumer) PersistConsumerOffset() error

func (*DefaultPullConsumer) PersistOffset added in v1.0.6

func (pc *DefaultPullConsumer) PersistOffset(ctx context.Context, topic string) error

PersistOffset persist all offset in mem.

func (*DefaultPullConsumer) Poll added in v1.0.6

func (*DefaultPullConsumer) Pull added in v1.0.6

func (pc *DefaultPullConsumer) Pull(ctx context.Context, numbers int) (*primitive.PullResult, error)

func (*DefaultPullConsumer) PullFrom added in v1.0.6

func (pc *DefaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

PullFrom pull messages of queue from the offset to offset + numbers

func (*DefaultPullConsumer) Rebalance added in v1.0.6

func (pc *DefaultPullConsumer) Rebalance()

func (*DefaultPullConsumer) RebalanceIfNotPaused added in v1.0.6

func (pc *DefaultPullConsumer) RebalanceIfNotPaused()

func (*DefaultPullConsumer) ReleaseSubGroup added in v1.0.6

func (pc *DefaultPullConsumer) ReleaseSubGroup(group string) bool

func (*DefaultPullConsumer) ResetOffset added in v1.0.6

func (pc *DefaultPullConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64)

func (*DefaultPullConsumer) ServiceRunningOk added in v1.0.6

func (pc *DefaultPullConsumer) ServiceRunningOk() bool

func (*DefaultPullConsumer) Shutdown added in v1.0.6

func (pc *DefaultPullConsumer) Shutdown() error

Shutdown close defaultConsumer, refuse new request.

func (*DefaultPullConsumer) Start added in v1.0.6

func (pc *DefaultPullConsumer) Start() error

func (*DefaultPullConsumer) Subscribe added in v1.0.6

func (pc *DefaultPullConsumer) Subscribe(topic string, selector MessageSelector) error

func (*DefaultPullConsumer) SubscriptionDataList added in v1.0.6

func (pc *DefaultPullConsumer) SubscriptionDataList() []*internal.SubscriptionData

func (*DefaultPullConsumer) Unsubscribe added in v1.0.6

func (pc *DefaultPullConsumer) Unsubscribe(topic string) error

func (*DefaultPullConsumer) UpdateOffset added in v1.0.6

func (pc *DefaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error

UpdateOffset updateOffset update offset of queue in mem

func (*DefaultPullConsumer) UpdateTopicSubscribeInfo added in v1.0.6

func (pc *DefaultPullConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)

type ExpressionType

type ExpressionType string

type Limiter

type Limiter func(topic string)

type MessageModel

type MessageModel int

Message model defines the way how messages are delivered to each consumer clients. </p>

RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>

This field defaults to clustering.

const (
	BroadCasting MessageModel = iota
	Clustering
)

func (MessageModel) String

func (mode MessageModel) String() string

type MessageQueueKey

type MessageQueueKey primitive.MessageQueue

func (MessageQueueKey) MarshalText

func (mq MessageQueueKey) MarshalText() (text []byte, err error)

func (*MessageQueueKey) UnmarshalText

func (mq *MessageQueueKey) UnmarshalText(text []byte) error

type MessageSelector

type MessageSelector struct {
	Type       ExpressionType
	Expression string
}

type MockOffsetStore

type MockOffsetStore struct {
	// contains filtered or unexported fields
}

MockOffsetStore is a mock of OffsetStore interface.

func NewMockOffsetStore

func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore

NewMockOffsetStore creates a new mock instance.

func (*MockOffsetStore) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockOffsetStoreMockRecorder

type MockOffsetStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore.

type OffsetSerializeWrapper

type OffsetSerializeWrapper struct {
	OffsetTable map[MessageQueueKey]int64 `json:"offsetTable"`
}

type OffsetStore

type OffsetStore interface {
	// contains filtered or unexported methods
}

func NewLocalFileOffsetStore

func NewLocalFileOffsetStore(clientID, group string) OffsetStore

func NewRemoteOffsetStore

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore

type Option

type Option func(*consumerOptions)

func WithAutoCommit

func WithAutoCommit(auto bool) Option

func WithConsumeConcurrentlyMaxSpan

func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option

func WithConsumeFromWhere

func WithConsumeFromWhere(w ConsumeFromWhere) Option

func WithConsumeGoroutineNums

func WithConsumeGoroutineNums(nums int) Option

func WithConsumeMessageBatchMaxSize

func WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize int) Option

func WithConsumeTimeout

func WithConsumeTimeout(timeout time.Duration) Option

func WithConsumeTimestamp

func WithConsumeTimestamp(consumeTimestamp string) Option

func WithConsumerModel

func WithConsumerModel(m MessageModel) Option

func WithConsumerOrder

func WithConsumerOrder(order bool) Option

func WithConsumerPullTimeout

func WithConsumerPullTimeout(consumerPullTimeout time.Duration) Option

func WithCredentials

func WithCredentials(c primitive.Credentials) Option

func WithFilterMessageHook

func WithFilterMessageHook(hooks []hooks.FilterMessageHook) Option

func WithGroupName

func WithGroupName(group string) Option

WithGroupName set group name address

func WithInstance

func WithInstance(name string) Option

func WithInterceptor

func WithInterceptor(fs ...primitive.Interceptor) Option

WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call.

func WithLimiter

func WithLimiter(limiter Limiter) Option

func WithMaxReconsumeTimes

func WithMaxReconsumeTimes(times int32) Option

WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.

func WithNameServer

func WithNameServer(nameServers primitive.NamesrvAddr) Option

WithNameServer set NameServer address, only support one NameServer cluster in alpha2

func WithNameServerDomain

func WithNameServerDomain(nameServerUrl string) Option

WithNameServerDomain set NameServer domain

func WithNamespace

func WithNamespace(namespace string) Option

WithNamespace set the namespace of consumer

func WithNsResolver

func WithNsResolver(resolver primitive.NsResolver) Option

WithNsResolver set nameserver resolver to fetch nameserver addr

func WithPullBatchSize

func WithPullBatchSize(batchSize int32) Option

func WithPullInterval

func WithPullInterval(interval time.Duration) Option

func WithPullThresholdForQueue

func WithPullThresholdForQueue(pullThresholdForQueue int64) Option

func WithPullThresholdForTopic

func WithPullThresholdForTopic(pullThresholdForTopic int) Option

func WithPullThresholdSizeForQueue

func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option

func WithPullThresholdSizeForTopic

func WithPullThresholdSizeForTopic(pullThresholdSizeForTopic int) Option

func WithRebalanceLockInterval

func WithRebalanceLockInterval(interval time.Duration) Option

func WithRetry

func WithRetry(retries int) Option

WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead

func WithStrategy

func WithStrategy(strategy AllocateStrategy) Option

func WithSuspendCurrentQueueTimeMillis

func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option

func WithTrace

func WithTrace(traceCfg *primitive.TraceConfig) Option

WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.

func WithUnitName

func WithUnitName(unitName string) Option

WithUnitName set the name of specified unit

func WithVIPChannel

func WithVIPChannel(enable bool) Option

type PullRequest

type PullRequest struct {
	// contains filtered or unexported fields
}

func (*PullRequest) String

func (pr *PullRequest) String() string

type PushConsumerCallback

type PushConsumerCallback struct {
	// contains filtered or unexported fields
}

func (PushConsumerCallback) UniqueID

func (callback PushConsumerCallback) UniqueID() string

type QueueLock

type QueueLock struct {
	// contains filtered or unexported fields
}

type StatsManager

type StatsManager struct {
	// contains filtered or unexported fields
}

func NewStatsManager

func NewStatsManager() *StatsManager

func (*StatsManager) GetConsumeStatus

func (mgr *StatsManager) GetConsumeStatus(group, topic string) ConsumeStatus

func (*StatsManager) ShutDownStat

func (mgr *StatsManager) ShutDownStat()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL